You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 07:04:22 UTC
svn commit: r749202 [6/6] - in /incubator/cassandra/src: ./ org/ org/apache/
org/apache/cassandra/ org/apache/cassandra/db/
Added: incubator/cassandra/src/org/apache/cassandra/db/TimeFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/TimeFilter.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/TimeFilter.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/TimeFilter.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.IndexHelper;
+import org.apache.cassandra.io.SSTable;
+
+
+/**
+ * This class provides a filter for fitering out columns
+ * that are older than a specific time.
+ *
+ * @author pmalik
+ *
+ */
+class TimeFilter implements IFilter
+{
+ private long timeLimit_;
+ private boolean isDone_;
+
+ TimeFilter(long timeLimit)
+ {
+ timeLimit_ = timeLimit;
+ isDone_ = false;
+ }
+
+ public ColumnFamily filter(String cf, ColumnFamily columnFamily)
+ {
+ String[] values = RowMutation.getColumnAndColumnFamily(cf);
+ String cfName = columnFamily.name();
+ ColumnFamily filteredCf = new ColumnFamily(cfName);
+ if( values.length == 1 && !DatabaseDescriptor.getColumnType(cfName).equals("Super"))
+ {
+ Collection<IColumn> columns = columnFamily.getAllColumns();
+ int i =0;
+ for(IColumn column : columns)
+ {
+ if ( column.timestamp() >= timeLimit_ )
+ {
+ filteredCf.addColumn(column.name(), column);
+ ++i;
+ }
+ else
+ {
+ break;
+ }
+ }
+ if( i < columns.size() )
+ {
+ isDone_ = true;
+ }
+ }
+ else if ( values.length == 2 && DatabaseDescriptor.getColumnType(cfName).equals("Super") )
+ {
+ /*
+ * TODO : For super columns we need to re-visit this issue.
+ * For now this fn will set done to true if we are done with
+ * atleast one super column
+ */
+ Collection<IColumn> columns = columnFamily.getAllColumns();
+ for(IColumn column : columns)
+ {
+ SuperColumn superColumn = (SuperColumn)column;
+ SuperColumn filteredSuperColumn = new SuperColumn(superColumn.name());
+ filteredCf.addColumn(filteredSuperColumn.name(), filteredSuperColumn);
+ Collection<IColumn> subColumns = superColumn.getSubColumns();
+ int i = 0;
+ for(IColumn subColumn : subColumns)
+ {
+ if ( subColumn.timestamp() >= timeLimit_ )
+ {
+ filteredSuperColumn.addColumn(subColumn.name(), subColumn);
+ ++i;
+ }
+ else
+ {
+ break;
+ }
+ }
+ if( i < filteredSuperColumn.getColumnCount() )
+ {
+ isDone_ = true;
+ }
+ }
+ }
+ else
+ {
+ throw new UnsupportedOperationException();
+ }
+ return filteredCf;
+ }
+
+ public IColumn filter(IColumn column, DataInputStream dis) throws IOException
+ {
+ long timeStamp = 0;
+ /*
+ * If its a column instance we need the timestamp to verify if
+ * it should be filtered , but at this instance the timestamp is not read
+ * so we read the timestamp and set the buffer back so that the rest of desrialization
+ * logic does not change.
+ */
+ if(column instanceof Column)
+ {
+ dis.mark(1000);
+ dis.readBoolean();
+ timeStamp = dis.readLong();
+ dis.reset();
+ if( timeStamp < timeLimit_ )
+ {
+ isDone_ = true;
+ return null;
+ }
+ }
+ return column;
+ }
+
+
+ public boolean isDone()
+ {
+ return isDone_;
+ }
+
+ public DataInputBuffer next(String key, String cf, SSTable ssTable) throws IOException
+ {
+ return ssTable.next( key, cf, new IndexHelper.TimeRange( timeLimit_, System.currentTimeMillis() ) );
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/db/TouchMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/TouchMessage.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/TouchMessage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/TouchMessage.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,109 @@
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+
+
+public class TouchMessage
+{
+
+private static ICompactSerializer<TouchMessage> serializer_;
+
+ static
+ {
+ serializer_ = new TouchMessageSerializer();
+ }
+
+ static ICompactSerializer<TouchMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ public static Message makeTouchMessage(TouchMessage touchMessage) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream( bos );
+ TouchMessage.serializer().serialize(touchMessage, dos);
+ Message message = new Message(StorageService.getLocalStorageEndPoint(), StorageService.readStage_, StorageService.touchVerbHandler_, new Object[]{bos.toByteArray()});
+ return message;
+ }
+
+ @XmlElement(name="Table")
+ private String table_;
+
+ @XmlElement(name="Key")
+ private String key_;
+
+ @XmlElement(name="fData")
+ private boolean fData_ = true;
+
+ private TouchMessage()
+ {
+ }
+
+ public TouchMessage(String table, String key)
+ {
+ table_ = table;
+ key_ = key;
+ }
+
+ public TouchMessage(String table, String key, boolean fData)
+ {
+ table_ = table;
+ key_ = key;
+ fData_ = fData;
+ }
+
+
+ String table()
+ {
+ return table_;
+ }
+
+ String key()
+ {
+ return key_;
+ }
+
+ public boolean isData()
+ {
+ return fData_;
+ }
+}
+
+class TouchMessageSerializer implements ICompactSerializer<TouchMessage>
+{
+ public void serialize(TouchMessage tm, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(tm.table());
+ dos.writeUTF(tm.key());
+ dos.writeBoolean(tm.isData());
+ }
+
+ public TouchMessage deserialize(DataInputStream dis) throws IOException
+ {
+ String table = dis.readUTF();
+ String key = dis.readUTF();
+ boolean fData = dis.readBoolean();
+ TouchMessage tm = new TouchMessage( table, key, fData);
+ return tm;
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/db/TouchVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/TouchVerbHandler.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/TouchVerbHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/TouchVerbHandler.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,58 @@
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+public class TouchVerbHandler implements IVerbHandler
+{
+ private static class ReadContext
+ {
+ protected DataInputBuffer bufIn_ = new DataInputBuffer();
+ }
+
+
+ private static Logger logger_ = Logger.getLogger( ReadVerbHandler.class );
+ /* We use this so that we can reuse the same row mutation context for the mutation. */
+ private static ThreadLocal<ReadContext> tls_ = new InheritableThreadLocal<ReadContext>();
+
+ public void doVerb(Message message)
+ {
+ byte[] body = (byte[])message.getMessageBody()[0];
+ /* Obtain a Read Context from TLS */
+ ReadContext readCtx = tls_.get();
+ if ( readCtx == null )
+ {
+ readCtx = new ReadContext();
+ tls_.set(readCtx);
+ }
+ readCtx.bufIn_.reset(body, body.length);
+
+ try
+ {
+ TouchMessage touchMessage = TouchMessage.serializer().deserialize(readCtx.bufIn_);
+ Table table = Table.open(touchMessage.table());
+ table.touch(touchMessage.key(), touchMessage.isData());
+ }
+ catch ( IOException ex)
+ {
+ logger_.info( LogUtil.throwableToString(ex) );
+ }
+ }
+
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/db/TypeInfo.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/TypeInfo.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/TypeInfo.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/TypeInfo.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+public enum TypeInfo
+{
+ BYTE,
+ CHAR,
+ SHORT,
+ INT,
+ LONG,
+ DOUBLE,
+ FLOAT,
+ STRING,
+ BLOB;
+
+ public static byte toByte(TypeInfo ti)
+ {
+ byte value = 0;
+ switch(ti)
+ {
+ case BYTE:
+ value = 1;
+ break;
+
+ case CHAR:
+ value = 2;
+ break;
+
+ case SHORT:
+ value = 3;
+ break;
+
+ case INT:
+ value = 4;
+ break;
+
+ case LONG:
+ value = 5;
+ break;
+
+ case DOUBLE:
+ value = 6;
+ break;
+
+ case FLOAT:
+ value = 7;
+ break;
+
+ case STRING:
+ value = 8;
+ break;
+
+ case BLOB:
+ value = 9;
+ break;
+ }
+
+ return value;
+ }
+
+ public static TypeInfo fromByte(byte b)
+ {
+ TypeInfo ti = null;
+ switch(b)
+ {
+ case 1:
+ ti = TypeInfo.BYTE;
+ break;
+
+ case 2:
+ ti = TypeInfo.CHAR;
+ break;
+
+ case 3:
+ ti = TypeInfo.SHORT;
+ break;
+
+ case 4:
+ ti = TypeInfo.INT;
+ break;
+
+ case 5:
+ ti = TypeInfo.LONG;
+ break;
+
+ case 6:
+ ti = TypeInfo.DOUBLE;
+ break;
+
+ case 7:
+ ti = TypeInfo.FLOAT;
+ break;
+
+ case 8:
+ ti = TypeInfo.STRING;
+ break;
+
+ case 9:
+ ti = TypeInfo.BLOB;
+ break;
+ }
+ return ti;
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/db/WriteResponseMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/WriteResponseMessage.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/WriteResponseMessage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/WriteResponseMessage.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+
+
+/*
+ * This message is sent back the row mutation verb handler
+ * and basically specifes if the write succeeded or not for a particular
+ * key in a table
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class WriteResponseMessage implements Serializable
+{
+private static ICompactSerializer<WriteResponseMessage> serializer_;
+
+ static
+ {
+ serializer_ = new WriteResponseMessageSerializer();
+ }
+
+ static ICompactSerializer<WriteResponseMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ public static Message makeWriteResponseMessage(WriteResponseMessage writeResponseMessage) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream( bos );
+ WriteResponseMessage.serializer().serialize(writeResponseMessage, dos);
+ Message message = new Message(StorageService.getLocalStorageEndPoint(), MessagingService.responseStage_, MessagingService.responseVerbHandler_, new Object[]{bos.toByteArray()});
+ return message;
+ }
+
+ @XmlElement(name = "Table")
+ private String table_;
+
+ @XmlElement(name = "key")
+ private String key_;
+
+ @XmlElement(name = "Status")
+ private boolean status_;
+
+ private WriteResponseMessage() {
+ }
+
+ public WriteResponseMessage(String table, String key, boolean bVal) {
+ table_ = table;
+ key_ = key;
+ status_ = bVal;
+ }
+
+ public String table()
+ {
+ return table_;
+ }
+
+ public String key()
+ {
+ return key_;
+ }
+
+ public boolean isSuccess()
+ {
+ return status_;
+ }
+}
+
+class WriteResponseMessageSerializer implements ICompactSerializer<WriteResponseMessage>
+{
+ public void serialize(WriteResponseMessage wm, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(wm.table());
+ dos.writeUTF(wm.key());
+ dos.writeBoolean(wm.isSuccess());
+ }
+
+ public WriteResponseMessage deserialize(DataInputStream dis) throws IOException
+ {
+ String table = dis.readUTF();
+ String key = dis.readUTF();
+ boolean status = dis.readBoolean();
+ return new WriteResponseMessage(table, key, status);
+ }
+}
\ No newline at end of file