You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 07:04:22 UTC

svn commit: r749202 [6/6] - in /incubator/cassandra/src: ./ org/ org/apache/ org/apache/cassandra/ org/apache/cassandra/db/

Added: incubator/cassandra/src/org/apache/cassandra/db/TimeFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/TimeFilter.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/TimeFilter.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/TimeFilter.java Mon Mar  2 06:04:20 2009
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.IndexHelper;
+import org.apache.cassandra.io.SSTable;
+
+
+/**
+ * This class provides a filter for fitering out columns
+ * that are older than a specific time.
+ * 
+ * @author pmalik
+ *
+ */
+class TimeFilter implements IFilter
+{
+	private long timeLimit_;
+	private boolean isDone_;
+	
+	TimeFilter(long timeLimit)
+	{
+		timeLimit_ = timeLimit;		
+		isDone_ = false;
+	}
+
+	public ColumnFamily filter(String cf, ColumnFamily columnFamily)
+	{
+    	String[] values = RowMutation.getColumnAndColumnFamily(cf);
+		String cfName = columnFamily.name();
+		ColumnFamily filteredCf = new ColumnFamily(cfName);
+		if( values.length == 1 && !DatabaseDescriptor.getColumnType(cfName).equals("Super"))
+		{
+    		Collection<IColumn> columns = columnFamily.getAllColumns();
+    		int i =0; 
+    		for(IColumn column : columns)
+    		{
+    			if ( column.timestamp() >=  timeLimit_ )
+    			{
+    				filteredCf.addColumn(column.name(), column);
+    				++i;
+    			}
+    			else
+    			{
+    				break;
+    			}
+    		}
+    		if( i < columns.size() )
+    		{
+    			isDone_ = true;
+    		}
+		}    	
+    	else if ( values.length == 2 && DatabaseDescriptor.getColumnType(cfName).equals("Super") )
+    	{
+    		/* 
+    		 * TODO : For super columns we need to re-visit this issue.
+    		 * For now this fn will set done to true if we are done with
+    		 * atleast one super column
+    		 */
+    		Collection<IColumn> columns = columnFamily.getAllColumns();
+    		for(IColumn column : columns)
+    		{
+    			SuperColumn superColumn = (SuperColumn)column;
+    			SuperColumn filteredSuperColumn = new SuperColumn(superColumn.name());
+				filteredCf.addColumn(filteredSuperColumn.name(), filteredSuperColumn);
+        		Collection<IColumn> subColumns = superColumn.getSubColumns();
+        		int i = 0;
+        		for(IColumn subColumn : subColumns)
+        		{
+	    			if (  subColumn.timestamp()  >=  timeLimit_ )
+	    			{
+			            filteredSuperColumn.addColumn(subColumn.name(), subColumn);
+	    				++i;
+	    			}
+	    			else
+	    			{
+	    				break;
+	    			}
+        		}
+        		if( i < filteredSuperColumn.getColumnCount() )
+        		{
+        			isDone_ = true;
+        		}
+    		}
+    	}
+    	else 
+    	{
+    		throw new UnsupportedOperationException();
+    	}
+		return filteredCf;
+	}
+    
+    public IColumn filter(IColumn column, DataInputStream dis) throws IOException
+    {
+    	long timeStamp = 0;
+    	/*
+    	 * If its a column instance we need the timestamp to verify if 
+    	 * it should be filtered , but at this instance the timestamp is not read
+    	 * so we read the timestamp and set the buffer back so that the rest of desrialization
+    	 * logic does not change.
+    	 */
+    	if(column instanceof Column)
+    	{
+	    	dis.mark(1000);
+	        dis.readBoolean();
+	        timeStamp = dis.readLong();
+		    dis.reset();
+	    	if( timeStamp < timeLimit_ )
+	    	{
+	    		isDone_ = true;
+	    		return null;
+	    	}
+    	}
+    	return column;
+    }
+    
+	
+	public boolean isDone()
+	{
+		return isDone_;
+	}
+
+	public DataInputBuffer next(String key, String cf, SSTable ssTable) throws IOException
+    {
+    	return ssTable.next( key, cf, new IndexHelper.TimeRange( timeLimit_, System.currentTimeMillis() ) );
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/db/TouchMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/TouchMessage.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/TouchMessage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/TouchMessage.java Mon Mar  2 06:04:20 2009
@@ -0,0 +1,109 @@
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+
+
+public class TouchMessage
+{
+
+private static ICompactSerializer<TouchMessage> serializer_;	
+	
+    static
+    {
+        serializer_ = new TouchMessageSerializer();
+    }
+
+    static ICompactSerializer<TouchMessage> serializer()
+    {
+        return serializer_;
+    }
+    
+    public static Message makeTouchMessage(TouchMessage touchMessage) throws IOException
+    {
+    	ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream( bos );
+        TouchMessage.serializer().serialize(touchMessage, dos);
+        Message message = new Message(StorageService.getLocalStorageEndPoint(), StorageService.readStage_, StorageService.touchVerbHandler_, new Object[]{bos.toByteArray()});         
+        return message;
+    }
+    
+    @XmlElement(name="Table")
+    private String table_;
+    
+    @XmlElement(name="Key")
+    private String key_;
+    
+    @XmlElement(name="fData")
+    private boolean fData_ = true;
+        
+    private TouchMessage()
+    {
+    }
+    
+    public TouchMessage(String table, String key)
+    {
+        table_ = table;
+        key_ = key;
+    }
+
+    public TouchMessage(String table, String key, boolean fData)
+    {
+        table_ = table;
+        key_ = key;
+        fData_ = fData;
+    }
+    
+
+    String table()
+    {
+        return table_;
+    }
+    
+    String key()
+    {
+        return key_;
+    }
+
+    public boolean isData()
+    {
+    	return fData_;
+    }
+}
+
+class TouchMessageSerializer implements ICompactSerializer<TouchMessage>
+{
+	public void serialize(TouchMessage tm, DataOutputStream dos) throws IOException
+	{
+		dos.writeUTF(tm.table());
+		dos.writeUTF(tm.key());
+		dos.writeBoolean(tm.isData());
+	}
+	
+    public TouchMessage deserialize(DataInputStream dis) throws IOException
+    {
+		String table = dis.readUTF();
+		String key = dis.readUTF();
+		boolean fData = dis.readBoolean();
+		TouchMessage tm = new TouchMessage( table, key, fData);
+    	return tm;
+    }
+	
+	/**
+	 * @param args
+	 */
+	public static void main(String[] args)
+	{
+		// TODO Auto-generated method stub
+
+	}
+
+}

Added: incubator/cassandra/src/org/apache/cassandra/db/TouchVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/TouchVerbHandler.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/TouchVerbHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/TouchVerbHandler.java Mon Mar  2 06:04:20 2009
@@ -0,0 +1,58 @@
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+public class TouchVerbHandler implements IVerbHandler
+{
+    private static class ReadContext
+    {
+        protected DataInputBuffer bufIn_ = new DataInputBuffer();
+    }
+
+	
+    private static Logger logger_ = Logger.getLogger( ReadVerbHandler.class );
+    /* We use this so that we can reuse the same row mutation context for the mutation. */
+    private static ThreadLocal<ReadContext> tls_ = new InheritableThreadLocal<ReadContext>();
+
+    public void doVerb(Message message)
+    {
+        byte[] body = (byte[])message.getMessageBody()[0];
+        /* Obtain a Read Context from TLS */
+        ReadContext readCtx = tls_.get();
+        if ( readCtx == null )
+        {
+            readCtx = new ReadContext();
+            tls_.set(readCtx);
+        }
+        readCtx.bufIn_.reset(body, body.length);
+
+        try
+        {
+            TouchMessage touchMessage = TouchMessage.serializer().deserialize(readCtx.bufIn_);
+            Table table = Table.open(touchMessage.table());
+   			table.touch(touchMessage.key(), touchMessage.isData());
+        }
+        catch ( IOException ex)
+        {
+            logger_.info( LogUtil.throwableToString(ex) );
+        }
+    }
+	
+	
+	/**
+	 * @param args
+	 */
+	public static void main(String[] args)
+	{
+		// TODO Auto-generated method stub
+
+	}
+
+}

Added: incubator/cassandra/src/org/apache/cassandra/db/TypeInfo.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/TypeInfo.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/TypeInfo.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/TypeInfo.java Mon Mar  2 06:04:20 2009
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+public enum TypeInfo
+{
+    BYTE,
+    CHAR,
+    SHORT,
+    INT,
+    LONG,
+    DOUBLE,
+    FLOAT,
+    STRING,
+    BLOB;
+    
+    public static byte toByte(TypeInfo ti)
+    {
+        byte value = 0;
+        switch(ti)
+        {
+            case BYTE:
+                value = 1;
+                break;
+            
+            case CHAR:
+                value = 2;
+                break;
+                
+            case SHORT:
+                value = 3;
+                break;
+                
+            case INT:
+                value = 4;
+                break;
+                
+            case LONG:
+                value = 5;
+                break;
+                
+            case DOUBLE:
+                value = 6;
+                break;
+                
+            case FLOAT:
+                value = 7;
+                break;
+                
+            case STRING:
+                value = 8;
+                break;
+                
+            case BLOB:
+                value = 9;
+                break;
+        }
+        
+        return value;
+    }
+    
+    public static TypeInfo fromByte(byte b)
+    {
+        TypeInfo ti = null;
+        switch(b)
+        {
+            case 1:
+                ti = TypeInfo.BYTE;                
+                break;
+                
+            case 2:
+                ti = TypeInfo.CHAR;
+                break;
+                
+            case 3:
+                ti = TypeInfo.SHORT;
+                break;
+                
+            case 4:
+                ti = TypeInfo.INT;
+                break;
+                
+            case 5:
+                ti = TypeInfo.LONG;
+                break;
+                
+            case 6:
+                ti = TypeInfo.DOUBLE;
+                break;
+                
+            case 7:
+                ti = TypeInfo.FLOAT;
+                break;
+                
+            case 8:
+                ti = TypeInfo.STRING;
+                break;
+                
+            case 9:
+                ti = TypeInfo.BLOB;
+                break;               
+        }
+        return ti;
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/db/WriteResponseMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/WriteResponseMessage.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/WriteResponseMessage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/WriteResponseMessage.java Mon Mar  2 06:04:20 2009
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+
+
+/*
+ * This message is sent back the row mutation verb handler 
+ * and basically specifes if the write succeeded or not for a particular 
+ * key in a table
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class WriteResponseMessage implements Serializable
+{
+private static ICompactSerializer<WriteResponseMessage> serializer_;	
+	
+    static
+    {
+        serializer_ = new WriteResponseMessageSerializer();
+    }
+
+    static ICompactSerializer<WriteResponseMessage> serializer()
+    {
+        return serializer_;
+    }
+	
+    public static Message makeWriteResponseMessage(WriteResponseMessage writeResponseMessage) throws IOException
+    {
+    	ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream( bos );
+        WriteResponseMessage.serializer().serialize(writeResponseMessage, dos);
+        Message message = new Message(StorageService.getLocalStorageEndPoint(), MessagingService.responseStage_, MessagingService.responseVerbHandler_, new Object[]{bos.toByteArray()});         
+        return message;
+    }
+    
+	@XmlElement(name = "Table")
+	private String table_;
+
+	@XmlElement(name = "key")
+	private String key_;
+	
+	@XmlElement(name = "Status")
+	private boolean status_;
+	
+	private WriteResponseMessage() {
+	}
+
+	public WriteResponseMessage(String table, String key, boolean bVal) {
+		table_ = table;
+		key_ = key;
+		status_ = bVal;
+	}
+
+	public String table() 
+	{
+		return table_;
+	}
+
+	public String key() 
+	{
+		return key_;
+	}
+	
+	public boolean isSuccess()
+	{
+		return status_;
+	}
+}
+
+class WriteResponseMessageSerializer implements ICompactSerializer<WriteResponseMessage>
+{
+	public void serialize(WriteResponseMessage wm, DataOutputStream dos) throws IOException
+	{
+		dos.writeUTF(wm.table());
+		dos.writeUTF(wm.key());
+		dos.writeBoolean(wm.isSuccess());
+	}
+	
+    public WriteResponseMessage deserialize(DataInputStream dis) throws IOException
+    {
+    	String table = dis.readUTF();
+    	String key = dis.readUTF();
+    	boolean status = dis.readBoolean();
+    	return new WriteResponseMessage(table, key, status);
+    }
+}
\ No newline at end of file