You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC

svn commit: r799331 [8/29] - in /incubator/cassandra/trunk: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/j...

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java Thu Jul 30 15:30:21 2009
@@ -1,59 +1,59 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.*;
-
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-import org.apache.cassandra.service.*;
-import org.apache.cassandra.utils.*;
-import org.apache.cassandra.concurrent.*;
-import org.apache.cassandra.net.*;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class ReadRepairVerbHandler implements IVerbHandler
-{
-    private static Logger logger_ = Logger.getLogger(ReadRepairVerbHandler.class);    
-    
-    public void doVerb(Message message)
-    {          
-        byte[] body = message.getMessageBody();
-        DataInputBuffer buffer = new DataInputBuffer();
-        buffer.reset(body, body.length);        
-        
-        try
-        {
-            RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(buffer);
-            RowMutation rm = rmMsg.getRowMutation();
-            rm.apply();                                   
-        }
-        catch ( IOException e )
-        {
-            if (logger_.isDebugEnabled())
-                logger_.debug(LogUtil.throwableToString(e));            
-        }        
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.*;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.service.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.net.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ReadRepairVerbHandler implements IVerbHandler
+{
+    private static Logger logger_ = Logger.getLogger(ReadRepairVerbHandler.class);    
+    
+    public void doVerb(Message message)
+    {          
+        byte[] body = message.getMessageBody();
+        DataInputBuffer buffer = new DataInputBuffer();
+        buffer.reset(body, body.length);        
+        
+        try
+        {
+            RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(buffer);
+            RowMutation rm = rmMsg.getRowMutation();
+            rm.apply();                                   
+        }
+        catch ( IOException e )
+        {
+            if (logger_.isDebugEnabled())
+                logger_.debug(LogUtil.throwableToString(e));            
+        }        
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java Thu Jul 30 15:30:21 2009
@@ -1,137 +1,137 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.commons.lang.ArrayUtils;
-
-
-/*
- * The read response message is sent by the server when reading data 
- * this encapsulates the tablename and the row that has been read.
- * The table name is needed so that we can use it to create repairs.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-public class ReadResponse implements Serializable 
-{
-private static ICompactSerializer<ReadResponse> serializer_;
-
-    static
-    {
-        serializer_ = new ReadResponseSerializer();
-    }
-
-    public static ICompactSerializer<ReadResponse> serializer()
-    {
-        return serializer_;
-    }
-    
-	public static Message makeReadResponseMessage(ReadResponse readResponse) throws IOException
-    {
-    	ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream( bos );
-        ReadResponse.serializer().serialize(readResponse, dos);
-        Message message = new Message(StorageService.getLocalStorageEndPoint(), MessagingService.responseStage_, MessagingService.responseVerbHandler_, bos.toByteArray());         
-        return message;
-    }
-	
-	private Row row_;
-	private byte[] digest_ = ArrayUtils.EMPTY_BYTE_ARRAY;
-    private boolean isDigestQuery_ = false;
-
-	public ReadResponse(byte[] digest )
-    {
-        assert digest != null;
-		digest_= digest;
-	}
-
-	public ReadResponse(Row row)
-    {
-		row_ = row;
-	}
-
-	public Row row() 
-    {
-		return row_;
-    }
-        
-	public byte[] digest() 
-    {
-		return digest_;
-	}
-
-	public boolean isDigestQuery()
-    {
-    	return isDigestQuery_;
-    }
-    
-    public void setIsDigestQuery(boolean isDigestQuery)
-    {
-    	isDigestQuery_ = isDigestQuery;
-    }
-}
-
-class ReadResponseSerializer implements ICompactSerializer<ReadResponse>
-{
-	public void serialize(ReadResponse rm, DataOutputStream dos) throws IOException
-	{
-        dos.writeInt(rm.digest().length);
-        dos.write(rm.digest());
-        dos.writeBoolean(rm.isDigestQuery());
-        
-        if( !rm.isDigestQuery() && rm.row() != null )
-        {            
-            Row.serializer().serialize(rm.row(), dos);
-        }				
-	}
-	
-    public ReadResponse deserialize(DataInputStream dis) throws IOException
-    {
-        int digestSize = dis.readInt();
-        byte[] digest = new byte[digestSize];
-        dis.read(digest, 0 , digestSize);
-        boolean isDigest = dis.readBoolean();
-        
-        Row row = null;
-        if ( !isDigest )
-        {
-            row = Row.serializer().deserialize(dis);
-        }
-		
-		ReadResponse rmsg = null;
-    	if( isDigest  )
-        {
-    		rmsg =  new ReadResponse(digest);
-        }
-    	else
-        {
-    		rmsg =  new ReadResponse(row);
-        }
-        rmsg.setIsDigestQuery(isDigest);
-    	return rmsg;
-    } 
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.commons.lang.ArrayUtils;
+
+
+/*
+ * The read response message is sent by the server when reading data 
+ * this encapsulates the tablename and the row that has been read.
+ * The table name is needed so that we can use it to create repairs.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class ReadResponse implements Serializable 
+{
+private static ICompactSerializer<ReadResponse> serializer_;
+
+    static
+    {
+        serializer_ = new ReadResponseSerializer();
+    }
+
+    public static ICompactSerializer<ReadResponse> serializer()
+    {
+        return serializer_;
+    }
+    
+	public static Message makeReadResponseMessage(ReadResponse readResponse) throws IOException
+    {
+    	ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream( bos );
+        ReadResponse.serializer().serialize(readResponse, dos);
+        Message message = new Message(StorageService.getLocalStorageEndPoint(), MessagingService.responseStage_, MessagingService.responseVerbHandler_, bos.toByteArray());         
+        return message;
+    }
+	
+	private Row row_;
+	private byte[] digest_ = ArrayUtils.EMPTY_BYTE_ARRAY;
+    private boolean isDigestQuery_ = false;
+
+	public ReadResponse(byte[] digest )
+    {
+        assert digest != null;
+		digest_= digest;
+	}
+
+	public ReadResponse(Row row)
+    {
+		row_ = row;
+	}
+
+	public Row row() 
+    {
+		return row_;
+    }
+        
+	public byte[] digest() 
+    {
+		return digest_;
+	}
+
+	public boolean isDigestQuery()
+    {
+    	return isDigestQuery_;
+    }
+    
+    public void setIsDigestQuery(boolean isDigestQuery)
+    {
+    	isDigestQuery_ = isDigestQuery;
+    }
+}
+
+class ReadResponseSerializer implements ICompactSerializer<ReadResponse>
+{
+	public void serialize(ReadResponse rm, DataOutputStream dos) throws IOException
+	{
+        dos.writeInt(rm.digest().length);
+        dos.write(rm.digest());
+        dos.writeBoolean(rm.isDigestQuery());
+        
+        if( !rm.isDigestQuery() && rm.row() != null )
+        {            
+            Row.serializer().serialize(rm.row(), dos);
+        }				
+	}
+	
+    public ReadResponse deserialize(DataInputStream dis) throws IOException
+    {
+        int digestSize = dis.readInt();
+        byte[] digest = new byte[digestSize];
+        dis.read(digest, 0 , digestSize);
+        boolean isDigest = dis.readBoolean();
+        
+        Row row = null;
+        if ( !isDigest )
+        {
+            row = Row.serializer().deserialize(dis);
+        }
+		
+		ReadResponse rmsg = null;
+    	if( isDigest  )
+        {
+    		rmsg =  new ReadResponse(digest);
+        }
+    	else
+        {
+    		rmsg =  new ReadResponse(row);
+        }
+        rmsg.setIsDigestQuery(isDigest);
+    	return rmsg;
+    } 
 }
\ No newline at end of file

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Thu Jul 30 15:30:21 2009
@@ -1,123 +1,123 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class ReadVerbHandler implements IVerbHandler
-{
-    protected static class ReadContext
-    {
-        protected DataInputBuffer bufIn_ = new DataInputBuffer();
-        protected DataOutputBuffer bufOut_ = new DataOutputBuffer();
-    }
-
-    private static Logger logger_ = Logger.getLogger( ReadVerbHandler.class );
-    /* We use this so that we can reuse the same row mutation context for the mutation. */
-    private static ThreadLocal<ReadVerbHandler.ReadContext> tls_ = new InheritableThreadLocal<ReadVerbHandler.ReadContext>();
-    
-    protected static ReadVerbHandler.ReadContext getCurrentReadContext()
-    {
-        return tls_.get();
-    }
-    
-    protected static void setCurrentReadContext(ReadVerbHandler.ReadContext readContext)
-    {
-        tls_.set(readContext);
-    }
-
-    public void doVerb(Message message)
-    {
-        byte[] body = message.getMessageBody();
-        /* Obtain a Read Context from TLS */
-        ReadContext readCtx = tls_.get();
-        if ( readCtx == null )
-        {
-            readCtx = new ReadContext();
-            tls_.set(readCtx);
-        }
-        readCtx.bufIn_.reset(body, body.length);
-
-        try
-        {
-            ReadCommand readCommand = ReadCommand.serializer().deserialize(readCtx.bufIn_);
-            Table table = Table.open(readCommand.table);
-            Row row = null;
-            row = readCommand.getRow(table);
-            ReadResponse readResponse = null;
-            if (readCommand.isDigestQuery())
-            {
-                readResponse = new ReadResponse(row.digest());
-            }
-            else
-            {
-                readResponse = new ReadResponse(row);
-            }
-            readResponse.setIsDigestQuery(readCommand.isDigestQuery());
-            /* serialize the ReadResponseMessage. */
-            readCtx.bufOut_.reset();
-
-            ReadResponse.serializer().serialize(readResponse, readCtx.bufOut_);
-
-            byte[] bytes = new byte[readCtx.bufOut_.getLength()];
-            System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, bytes.length);
-
-            Message response = message.getReply(StorageService.getLocalStorageEndPoint(), bytes);
-            if (logger_.isDebugEnabled())
-              logger_.debug("Read key " + readCommand.key + "; sending response to " + message.getMessageId() + "@" + message.getFrom());
-            MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
-
-            /* Do read repair if header of the message says so */
-            if (message.getHeader(ReadCommand.DO_REPAIR) != null)
-            {
-                doReadRepair(row, readCommand);
-            }
-        }
-        catch (IOException ex)
-        {
-            throw new RuntimeException(ex);
-        }
-    }
-    
-    private void doReadRepair(Row row, ReadCommand readCommand)
-    {
-        List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(readCommand.key);
-        /* Remove the local storage endpoint from the list. */ 
-        endpoints.remove( StorageService.getLocalStorageEndPoint() );
-            
-        if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
-            StorageService.instance().doConsistencyCheck(row, endpoints, readCommand);
-    }     
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ReadVerbHandler implements IVerbHandler
+{
+    protected static class ReadContext
+    {
+        protected DataInputBuffer bufIn_ = new DataInputBuffer();
+        protected DataOutputBuffer bufOut_ = new DataOutputBuffer();
+    }
+
+    private static Logger logger_ = Logger.getLogger( ReadVerbHandler.class );
+    /* We use this so that we can reuse the same row mutation context for the mutation. */
+    private static ThreadLocal<ReadVerbHandler.ReadContext> tls_ = new InheritableThreadLocal<ReadVerbHandler.ReadContext>();
+    
+    protected static ReadVerbHandler.ReadContext getCurrentReadContext()
+    {
+        return tls_.get();
+    }
+    
+    protected static void setCurrentReadContext(ReadVerbHandler.ReadContext readContext)
+    {
+        tls_.set(readContext);
+    }
+
+    public void doVerb(Message message)
+    {
+        byte[] body = message.getMessageBody();
+        /* Obtain a Read Context from TLS */
+        ReadContext readCtx = tls_.get();
+        if ( readCtx == null )
+        {
+            readCtx = new ReadContext();
+            tls_.set(readCtx);
+        }
+        readCtx.bufIn_.reset(body, body.length);
+
+        try
+        {
+            ReadCommand readCommand = ReadCommand.serializer().deserialize(readCtx.bufIn_);
+            Table table = Table.open(readCommand.table);
+            Row row = null;
+            row = readCommand.getRow(table);
+            ReadResponse readResponse = null;
+            if (readCommand.isDigestQuery())
+            {
+                readResponse = new ReadResponse(row.digest());
+            }
+            else
+            {
+                readResponse = new ReadResponse(row);
+            }
+            readResponse.setIsDigestQuery(readCommand.isDigestQuery());
+            /* serialize the ReadResponseMessage. */
+            readCtx.bufOut_.reset();
+
+            ReadResponse.serializer().serialize(readResponse, readCtx.bufOut_);
+
+            byte[] bytes = new byte[readCtx.bufOut_.getLength()];
+            System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, bytes.length);
+
+            Message response = message.getReply(StorageService.getLocalStorageEndPoint(), bytes);
+            if (logger_.isDebugEnabled())
+              logger_.debug("Read key " + readCommand.key + "; sending response to " + message.getMessageId() + "@" + message.getFrom());
+            MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
+
+            /* Do read repair if header of the message says so */
+            if (message.getHeader(ReadCommand.DO_REPAIR) != null)
+            {
+                doReadRepair(row, readCommand);
+            }
+        }
+        catch (IOException ex)
+        {
+            throw new RuntimeException(ex);
+        }
+    }
+    
+    private void doReadRepair(Row row, ReadCommand readCommand)
+    {
+        List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(readCommand.key);
+        /* Remove the local storage endpoint from the list. */ 
+        endpoints.remove( StorageService.getLocalStorageEndPoint() );
+            
+        if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+            StorageService.instance().doConsistencyCheck(row, endpoints, readCommand);
+    }     
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java Thu Jul 30 15:30:21 2009
@@ -1,66 +1,66 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.util.*;
-import java.io.*;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.FileUtils;
-import org.apache.log4j.Logger;
-import org.apache.commons.lang.StringUtils;
-
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class RecoveryManager
-{
-    private static RecoveryManager instance_;
-    private static Logger logger_ = Logger.getLogger(RecoveryManager.class);
-
-    public synchronized static RecoveryManager instance() throws IOException
-    {
-        if (instance_ == null)
-        {
-            instance_ = new RecoveryManager();
-        }
-        return instance_;
-    }
-
-    public static File[] getListofCommitLogs()
-    {
-        String directory = DatabaseDescriptor.getLogFileLocation();
-        File file = new File(directory);
-        return file.listFiles();
-    }
-
-    public static void doRecovery() throws IOException
-    {
-        File[] files = getListofCommitLogs();
-        if (files.length == 0)
-            return;
-
-        Arrays.sort(files, new FileUtils.FileComparator());
-        logger_.info("Replaying " + StringUtils.join(files, ", "));
-        new CommitLog(true).recover(files);
-        FileUtils.delete(files);
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.*;
+import java.io.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class RecoveryManager
+{
+    private static RecoveryManager instance_;
+    private static Logger logger_ = Logger.getLogger(RecoveryManager.class);
+
+    public synchronized static RecoveryManager instance() throws IOException
+    {
+        if (instance_ == null)
+        {
+            instance_ = new RecoveryManager();
+        }
+        return instance_;
+    }
+
+    public static File[] getListofCommitLogs()
+    {
+        String directory = DatabaseDescriptor.getLogFileLocation();
+        File file = new File(directory);
+        return file.listFiles();
+    }
+
+    public static void doRecovery() throws IOException
+    {
+        File[] files = getListofCommitLogs();
+        if (files.length == 0)
+            return;
+
+        Arrays.sort(files, new FileUtils.FileComparator());
+        logger_.info("Replaying " + StringUtils.join(files, ", "));
+        new CommitLog(true).recover(files);
+        FileUtils.delete(files);
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Thu Jul 30 15:30:21 2009
@@ -1,231 +1,231 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.Arrays;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.utils.FBUtilities;
-
-public class Row
-{
-    private static Logger logger_ = Logger.getLogger(Row.class);
-    private String table_;
-    private static RowSerializer serializer = new RowSerializer();
-
-    static RowSerializer serializer()
-    {
-        return serializer;
-    }
-
-    public Row(String table, String key) {
-        assert table != null;
-        this.table_ = table;
-        this.key_ = key;
-    }
-
-    // only for use by RMVH
-    Row()
-    {
-    }
-
-    public String getTable() {
-        return table_;
-    }
-
-    private String key_;
-
-    private Map<String, ColumnFamily> columnFamilies_ = new HashMap<String, ColumnFamily>();
-
-    public String key()
-    {
-        return key_;
-    }
-
-    void setKey(String key)
-    {
-        key_ = key;
-    }
-
-    public void setTable(String table)
-    {
-        table_ = table;
-    }
-
-    public Set<String> getColumnFamilyNames()
-    {
-        return columnFamilies_.keySet();
-    }
-
-    public Collection<ColumnFamily> getColumnFamilies()
-    {
-        return columnFamilies_.values();
-    }
-
-    public ColumnFamily getColumnFamily(String cfName)
-    {
-        return columnFamilies_.get(cfName);
-    }
-
-    void addColumnFamily(ColumnFamily columnFamily)
-    {
-        columnFamilies_.put(columnFamily.name(), columnFamily);
-    }
-
-    void removeColumnFamily(ColumnFamily columnFamily)
-    {
-        columnFamilies_.remove(columnFamily.name());
-        int delta = (-1) * columnFamily.size();
-    }
-
-    public boolean isEmpty()
-    {
-        return (columnFamilies_.size() == 0);
-    }
-
-    /*
-     * This function will repair the current row with the input row
-     * what that means is that if there are any differences between the 2 rows then
-     * this function will make the current row take the latest changes.
-     */
-    public void repair(Row rowOther)
-    {
-        for (ColumnFamily cfOld : rowOther.getColumnFamilies())
-        {
-            ColumnFamily cf = columnFamilies_.get(cfOld.name());
-            if (cf == null)
-            {
-                addColumnFamily(cfOld);
-            }
-            else
-            {
-                columnFamilies_.remove(cf.name());
-                addColumnFamily(ColumnFamily.resolve(Arrays.asList(cfOld, cf)));
-            }
-        }
-    }
-
-    /*
-     * This function will calculate the difference between 2 rows
-     * and return the resultant row. This assumes that the row that
-     * is being submitted is a super set of the current row so
-     * it only calculates additional
-     * difference and does not take care of what needs to be removed from the current row to make
-     * it same as the input row.
-     */
-    public Row diff(Row rowComposite)
-    {
-        Row rowDiff = new Row(table_, key_);
-
-        for (ColumnFamily cfComposite : rowComposite.getColumnFamilies())
-        {
-            ColumnFamily cf = columnFamilies_.get(cfComposite.name());
-            if (cf == null)
-                rowDiff.addColumnFamily(cfComposite);
-            else
-            {
-                ColumnFamily cfDiff = cf.diff(cfComposite);
-                if (cfDiff != null)
-                    rowDiff.addColumnFamily(cfDiff);
-            }
-        }
-        if (rowDiff.getColumnFamilies().isEmpty())
-            return null;
-        else
-            return rowDiff;
-    }
-
-    public Row cloneMe()
-    {
-        Row row = new Row(table_, key_);
-        row.columnFamilies_ = new HashMap<String, ColumnFamily>(columnFamilies_);
-        return row;
-    }
-
-    public byte[] digest()
-    {
-        Set<String> cfamilies = columnFamilies_.keySet();
-        byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
-        for (String cFamily : cfamilies)
-        {
-            if (xorHash.length == 0)
-            {
-                xorHash = columnFamilies_.get(cFamily).digest();
-            }
-            else
-            {
-                xorHash = FBUtilities.xor(xorHash, columnFamilies_.get(cFamily).digest());
-            }
-        }
-        return xorHash;
-    }
-
-    void clear()
-    {
-        columnFamilies_.clear();
-    }
-
-    public String toString()
-    {
-        return "Row(" + key_ + " [" + StringUtils.join(columnFamilies_.values(), ", ") + ")]";
-    }
-}
-
-class RowSerializer implements ICompactSerializer<Row>
-{
-    public void serialize(Row row, DataOutputStream dos) throws IOException
-    {
-        dos.writeUTF(row.getTable());
-        dos.writeUTF(row.key());
-        Collection<ColumnFamily> columnFamilies = row.getColumnFamilies();
-        int size = columnFamilies.size();
-        dos.writeInt(size);
-
-        for (ColumnFamily cf : columnFamilies)
-        {
-            ColumnFamily.serializer().serialize(cf, dos);
-        }
-    }
-
-    public Row deserialize(DataInputStream dis) throws IOException
-    {
-        String table = dis.readUTF();
-        String key = dis.readUTF();
-        Row row = new Row(table, key);
-        int size = dis.readInt();
-
-        for (int i = 0; i < size; ++i)
-        {
-            ColumnFamily cf = ColumnFamily.serializer().deserialize(dis);
-            row.addColumnFamily(cf);
-        }
-        return row;
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Arrays;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class Row
+{
+    private static Logger logger_ = Logger.getLogger(Row.class);
+    private String table_;
+    private static RowSerializer serializer = new RowSerializer();
+
+    static RowSerializer serializer()
+    {
+        return serializer;
+    }
+
+    public Row(String table, String key) {
+        assert table != null;
+        this.table_ = table;
+        this.key_ = key;
+    }
+
+    // only for use by RMVH
+    Row()
+    {
+    }
+
+    public String getTable() {
+        return table_;
+    }
+
+    private String key_;
+
+    private Map<String, ColumnFamily> columnFamilies_ = new HashMap<String, ColumnFamily>();
+
+    public String key()
+    {
+        return key_;
+    }
+
+    void setKey(String key)
+    {
+        key_ = key;
+    }
+
+    public void setTable(String table)
+    {
+        table_ = table;
+    }
+
+    public Set<String> getColumnFamilyNames()
+    {
+        return columnFamilies_.keySet();
+    }
+
+    public Collection<ColumnFamily> getColumnFamilies()
+    {
+        return columnFamilies_.values();
+    }
+
+    public ColumnFamily getColumnFamily(String cfName)
+    {
+        return columnFamilies_.get(cfName);
+    }
+
+    void addColumnFamily(ColumnFamily columnFamily)
+    {
+        columnFamilies_.put(columnFamily.name(), columnFamily);
+    }
+
+    void removeColumnFamily(ColumnFamily columnFamily)
+    {
+        columnFamilies_.remove(columnFamily.name());
+        int delta = (-1) * columnFamily.size();
+    }
+
+    public boolean isEmpty()
+    {
+        return (columnFamilies_.size() == 0);
+    }
+
+    /*
+     * This function will repair the current row with the input row
+     * what that means is that if there are any differences between the 2 rows then
+     * this function will make the current row take the latest changes.
+     */
+    public void repair(Row rowOther)
+    {
+        for (ColumnFamily cfOld : rowOther.getColumnFamilies())
+        {
+            ColumnFamily cf = columnFamilies_.get(cfOld.name());
+            if (cf == null)
+            {
+                addColumnFamily(cfOld);
+            }
+            else
+            {
+                columnFamilies_.remove(cf.name());
+                addColumnFamily(ColumnFamily.resolve(Arrays.asList(cfOld, cf)));
+            }
+        }
+    }
+
+    /*
+     * This function will calculate the difference between 2 rows
+     * and return the resultant row. This assumes that the row that
+     * is being submitted is a super set of the current row so
+     * it only calculates additional
+     * difference and does not take care of what needs to be removed from the current row to make
+     * it same as the input row.
+     */
+    public Row diff(Row rowComposite)
+    {
+        Row rowDiff = new Row(table_, key_);
+
+        for (ColumnFamily cfComposite : rowComposite.getColumnFamilies())
+        {
+            ColumnFamily cf = columnFamilies_.get(cfComposite.name());
+            if (cf == null)
+                rowDiff.addColumnFamily(cfComposite);
+            else
+            {
+                ColumnFamily cfDiff = cf.diff(cfComposite);
+                if (cfDiff != null)
+                    rowDiff.addColumnFamily(cfDiff);
+            }
+        }
+        if (rowDiff.getColumnFamilies().isEmpty())
+            return null;
+        else
+            return rowDiff;
+    }
+
+    public Row cloneMe()
+    {
+        Row row = new Row(table_, key_);
+        row.columnFamilies_ = new HashMap<String, ColumnFamily>(columnFamilies_);
+        return row;
+    }
+
+    public byte[] digest()
+    {
+        Set<String> cfamilies = columnFamilies_.keySet();
+        byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
+        for (String cFamily : cfamilies)
+        {
+            if (xorHash.length == 0)
+            {
+                xorHash = columnFamilies_.get(cFamily).digest();
+            }
+            else
+            {
+                xorHash = FBUtilities.xor(xorHash, columnFamilies_.get(cFamily).digest());
+            }
+        }
+        return xorHash;
+    }
+
+    void clear()
+    {
+        columnFamilies_.clear();
+    }
+
+    public String toString()
+    {
+        return "Row(" + key_ + " [" + StringUtils.join(columnFamilies_.values(), ", ") + ")]";
+    }
+}
+
+class RowSerializer implements ICompactSerializer<Row>
+{
+    public void serialize(Row row, DataOutputStream dos) throws IOException
+    {
+        dos.writeUTF(row.getTable());
+        dos.writeUTF(row.key());
+        Collection<ColumnFamily> columnFamilies = row.getColumnFamilies();
+        int size = columnFamilies.size();
+        dos.writeInt(size);
+
+        for (ColumnFamily cf : columnFamilies)
+        {
+            ColumnFamily.serializer().serialize(cf, dos);
+        }
+    }
+
+    public Row deserialize(DataInputStream dis) throws IOException
+    {
+        String table = dis.readUTF();
+        String key = dis.readUTF();
+        Row row = new Row(table, key);
+        int size = dis.readInt();
+
+        for (int i = 0; i < size; ++i)
+        {
+            ColumnFamily cf = ColumnFamily.serializer().deserialize(dis);
+            row.addColumnFamily(cf);
+        }
+        return row;
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Thu Jul 30 15:30:21 2009
@@ -1,345 +1,345 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.nio.ByteBuffer;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.BatchMutationSuper;
-import org.apache.cassandra.service.BatchMutation;
-import org.apache.cassandra.service.InvalidRequestException;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.marshal.MarshalException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class RowMutation implements Serializable
-{
-    private static ICompactSerializer<RowMutation> serializer_;
-    public static final String HINT = "HINT";
-
-    static
-    {
-        serializer_ = new RowMutationSerializer();
-    }   
-
-    static ICompactSerializer<RowMutation> serializer()
-    {
-        return serializer_;
-    }
-
-    private String table_;
-    private String key_;
-    protected Map<String, ColumnFamily> modifications_ = new HashMap<String, ColumnFamily>();
-
-    public RowMutation(String table, String key)
-    {
-        table_ = table;
-        key_ = key;
-    }
-
-    public RowMutation(String table, Row row)
-    {
-        table_ = table;
-        key_ = row.key();
-        for (ColumnFamily cf : row.getColumnFamilies())
-        {
-            add(cf);
-        }
-    }
-
-    protected RowMutation(String table, String key, Map<String, ColumnFamily> modifications)
-    {
-        table_ = table;
-        key_ = key;
-        modifications_ = modifications;
-    }
-
-    public String table()
-    {
-        return table_;
-    }
-
-    public String key()
-    {
-        return key_;
-    }
-
-    public Set<String> columnFamilyNames()
-    {
-        return modifications_.keySet();
-    }
-
-    void addHints(String key, String host) throws IOException
-    {
-        QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, key.getBytes("UTF-8"), host.getBytes("UTF-8"));
-        add(path, ArrayUtils.EMPTY_BYTE_ARRAY, System.currentTimeMillis());
-    }
-
-    /*
-     * Specify a column family name and the corresponding column
-     * family object.
-     * param @ cf - column family name
-     * param @ columnFamily - the column family.
-    */
-    public void add(ColumnFamily columnFamily)
-    {
-        if (modifications_.containsKey(columnFamily.name()))
-        {
-            throw new IllegalArgumentException("ColumnFamily " + columnFamily.name() + " is already being modified");
-        }
-        modifications_.put(columnFamily.name(), columnFamily);
-    }
-
-    /*
-     * Specify a column name and a corresponding value for
-     * the column. Column name is specified as <column family>:column.
-     * This will result in a ColumnFamily associated with
-     * <column family> as name and a Column with <column>
-     * as name. The column can be further broken up
-     * as super column name : columnname  in case of super columns
-     *
-     * param @ cf - column name as <column family>:<column>
-     * param @ value - value associated with the column
-     * param @ timestamp - timestamp associated with this data.
-    */
-    public void add(QueryPath path, byte[] value, long timestamp)
-    {
-        ColumnFamily columnFamily = modifications_.get(path.columnFamilyName);
-        if (columnFamily == null)
-        {
-            columnFamily = ColumnFamily.create(table_, path.columnFamilyName);
-        }
-        columnFamily.addColumn(path, value, timestamp);
-        modifications_.put(path.columnFamilyName, columnFamily);
-    }
-
-    public void delete(QueryPath path, long timestamp)
-    {
-        assert path.columnFamilyName != null;
-        String cfName = path.columnFamilyName;
-
-        if (modifications_.containsKey(cfName))
-        {
-            throw new IllegalArgumentException("ColumnFamily " + cfName + " is already being modified");
-        }
-
-        int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
-
-        ColumnFamily columnFamily = modifications_.get(cfName);
-        if (columnFamily == null)
-            columnFamily = ColumnFamily.create(table_, cfName);
-
-        if (path.superColumnName == null && path.columnName == null)
-        {
-            columnFamily.delete(localDeleteTime, timestamp);
-        }
-        else if (path.columnName == null)
-        {
-            SuperColumn sc = new SuperColumn(path.superColumnName, DatabaseDescriptor.getSubComparator(table_, cfName));
-            sc.markForDeleteAt(localDeleteTime, timestamp);
-            columnFamily.addColumn(sc);
-        }
-        else
-        {
-            ByteBuffer bytes = ByteBuffer.allocate(4);
-            bytes.putInt(localDeleteTime);
-            columnFamily.addColumn(path, bytes.array(), timestamp, true);
-        }
-
-        modifications_.put(cfName, columnFamily);
-    }
-
-    /*
-     * This is equivalent to calling commit. Applies the changes to
-     * to the table that is obtained by calling Table.open().
-    */
-    public void apply() throws IOException
-    {
-        Row row = new Row(table_, key_);
-        apply(row);
-    }
-
-    /*
-     * Allows RowMutationVerbHandler to optimize by re-using a single Row object.
-    */
-    void apply(Row emptyRow) throws IOException
-    {
-        assert emptyRow.getColumnFamilies().size() == 0;
-        Table table = Table.open(table_);
-        for (String cfName : modifications_.keySet())
-        {
-            assert table.isValidColumnFamily(cfName);
-            emptyRow.addColumnFamily(modifications_.get(cfName));
-        }
-        table.apply(emptyRow);
-    }
-
-    /*
-     * This is equivalent to calling commit. Applies the changes to
-     * to the table that is obtained by calling Table.open().
-    */
-    void applyBinary(Row emptyRow) throws IOException, ExecutionException, InterruptedException
-    {
-        assert emptyRow.getColumnFamilies().size() == 0;
-        Table table = Table.open(table_);
-        Set<String> cfNames = modifications_.keySet();
-        for (String cfName : cfNames)
-        {
-            assert table.isValidColumnFamily(cfName);
-            emptyRow.addColumnFamily(modifications_.get(cfName));
-        }
-        table.load(emptyRow);
-    }
-
-    public Message makeRowMutationMessage() throws IOException
-    {
-        return makeRowMutationMessage(StorageService.mutationVerbHandler_);
-    }
-
-    public Message makeRowMutationMessage(String verbHandlerName) throws IOException
-    {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(bos);
-        serializer().serialize(this, dos);
-        EndPoint local = StorageService.getLocalStorageEndPoint();
-        EndPoint from = (local != null) ? local : new EndPoint(FBUtilities.getHostAddress(), 7000);
-        return new Message(from, StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
-    }
-
-    public static RowMutation getRowMutation(String table, BatchMutation batchMutation)
-    {
-        RowMutation rm = new RowMutation(table, batchMutation.key.trim());
-        for (String cfname : batchMutation.cfmap.keySet())
-        {
-            List<org.apache.cassandra.service.Column> list = batchMutation.cfmap.get(cfname);
-            for (org.apache.cassandra.service.Column column : list)
-            {
-                rm.add(new QueryPath(cfname, null, column.name), column.value, column.timestamp);
-            }
-        }
-        return rm;
-    }
-
-    public static RowMutation getRowMutation(String table, BatchMutationSuper batchMutationSuper) throws InvalidRequestException
-    {
-        RowMutation rm = new RowMutation(table, batchMutationSuper.key.trim());
-        for (String cfName : batchMutationSuper.cfmap.keySet())
-        {
-            for (org.apache.cassandra.service.SuperColumn super_column : batchMutationSuper.cfmap.get(cfName))
-            {
-                for (org.apache.cassandra.service.Column column : super_column.columns)
-                {
-                    try
-                    {
-                        rm.add(new QueryPath(cfName, super_column.name, column.name), column.value, column.timestamp);
-                    }
-                    catch (MarshalException e)
-                    {
-                        throw new InvalidRequestException(e.getMessage());
-                    }
-                }
-            }
-        }
-        return rm;
-    }
-
-    public String toString()
-    {
-        return "RowMutation(" +
-               "table='" + table_ + '\'' +
-               ", key='" + key_ + '\'' +
-               ", modifications=[" + StringUtils.join(modifications_.values(), ", ") + "]" +
-               ')';
-    }
-}
-
-class RowMutationSerializer implements ICompactSerializer<RowMutation>
-{
-    private void freezeTheMaps(Map<String, ColumnFamily> map, DataOutputStream dos) throws IOException
-    {
-        int size = map.size();
-        dos.writeInt(size);
-        if (size > 0)
-        {
-            Set<String> keys = map.keySet();
-            for (String key : keys)
-            {
-                dos.writeUTF(key);
-                ColumnFamily cf = map.get(key);
-                if (cf != null)
-                {
-                    ColumnFamily.serializer().serialize(cf, dos);
-                }
-            }
-        }
-    }
-
-    public void serialize(RowMutation rm, DataOutputStream dos) throws IOException
-    {
-        dos.writeUTF(rm.table());
-        dos.writeUTF(rm.key());
-
-        /* serialize the modifications_ in the mutation */
-        freezeTheMaps(rm.modifications_, dos);
-    }
-
-    private Map<String, ColumnFamily> defreezeTheMaps(DataInputStream dis) throws IOException
-    {
-        Map<String, ColumnFamily> map = new HashMap<String, ColumnFamily>();
-        int size = dis.readInt();
-        for (int i = 0; i < size; ++i)
-        {
-            String key = dis.readUTF();
-            ColumnFamily cf = ColumnFamily.serializer().deserialize(dis);
-            map.put(key, cf);
-        }
-        return map;
-    }
-
-    public RowMutation deserialize(DataInputStream dis) throws IOException
-    {
-        String table = dis.readUTF();
-        String key = dis.readUTF();
-        Map<String, ColumnFamily> modifications = defreezeTheMaps(dis);
-        return new RowMutation(table, key, modifications);
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.BatchMutationSuper;
+import org.apache.cassandra.service.BatchMutation;
+import org.apache.cassandra.service.InvalidRequestException;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class RowMutation implements Serializable
+{
+    private static ICompactSerializer<RowMutation> serializer_;
+    public static final String HINT = "HINT";
+
+    static
+    {
+        serializer_ = new RowMutationSerializer();
+    }   
+
+    static ICompactSerializer<RowMutation> serializer()
+    {
+        return serializer_;
+    }
+
+    private String table_;
+    private String key_;
+    protected Map<String, ColumnFamily> modifications_ = new HashMap<String, ColumnFamily>();
+
+    public RowMutation(String table, String key)
+    {
+        table_ = table;
+        key_ = key;
+    }
+
+    public RowMutation(String table, Row row)
+    {
+        table_ = table;
+        key_ = row.key();
+        for (ColumnFamily cf : row.getColumnFamilies())
+        {
+            add(cf);
+        }
+    }
+
+    protected RowMutation(String table, String key, Map<String, ColumnFamily> modifications)
+    {
+        table_ = table;
+        key_ = key;
+        modifications_ = modifications;
+    }
+
+    public String table()
+    {
+        return table_;
+    }
+
+    public String key()
+    {
+        return key_;
+    }
+
+    public Set<String> columnFamilyNames()
+    {
+        return modifications_.keySet();
+    }
+
+    void addHints(String key, String host) throws IOException
+    {
+        QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, key.getBytes("UTF-8"), host.getBytes("UTF-8"));
+        add(path, ArrayUtils.EMPTY_BYTE_ARRAY, System.currentTimeMillis());
+    }
+
+    /*
+     * Specify a column family name and the corresponding column
+     * family object.
+     * param @ cf - column family name
+     * param @ columnFamily - the column family.
+    */
+    public void add(ColumnFamily columnFamily)
+    {
+        if (modifications_.containsKey(columnFamily.name()))
+        {
+            throw new IllegalArgumentException("ColumnFamily " + columnFamily.name() + " is already being modified");
+        }
+        modifications_.put(columnFamily.name(), columnFamily);
+    }
+
+    /*
+     * Specify a column name and a corresponding value for
+     * the column. Column name is specified as <column family>:column.
+     * This will result in a ColumnFamily associated with
+     * <column family> as name and a Column with <column>
+     * as name. The column can be further broken up
+     * as super column name : columnname  in case of super columns
+     *
+     * param @ cf - column name as <column family>:<column>
+     * param @ value - value associated with the column
+     * param @ timestamp - timestamp associated with this data.
+    */
+    public void add(QueryPath path, byte[] value, long timestamp)
+    {
+        ColumnFamily columnFamily = modifications_.get(path.columnFamilyName);
+        if (columnFamily == null)
+        {
+            columnFamily = ColumnFamily.create(table_, path.columnFamilyName);
+        }
+        columnFamily.addColumn(path, value, timestamp);
+        modifications_.put(path.columnFamilyName, columnFamily);
+    }
+
+    public void delete(QueryPath path, long timestamp)
+    {
+        assert path.columnFamilyName != null;
+        String cfName = path.columnFamilyName;
+
+        if (modifications_.containsKey(cfName))
+        {
+            throw new IllegalArgumentException("ColumnFamily " + cfName + " is already being modified");
+        }
+
+        int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
+
+        ColumnFamily columnFamily = modifications_.get(cfName);
+        if (columnFamily == null)
+            columnFamily = ColumnFamily.create(table_, cfName);
+
+        if (path.superColumnName == null && path.columnName == null)
+        {
+            columnFamily.delete(localDeleteTime, timestamp);
+        }
+        else if (path.columnName == null)
+        {
+            SuperColumn sc = new SuperColumn(path.superColumnName, DatabaseDescriptor.getSubComparator(table_, cfName));
+            sc.markForDeleteAt(localDeleteTime, timestamp);
+            columnFamily.addColumn(sc);
+        }
+        else
+        {
+            ByteBuffer bytes = ByteBuffer.allocate(4);
+            bytes.putInt(localDeleteTime);
+            columnFamily.addColumn(path, bytes.array(), timestamp, true);
+        }
+
+        modifications_.put(cfName, columnFamily);
+    }
+
+    /*
+     * This is equivalent to calling commit. Applies the changes to
+     * to the table that is obtained by calling Table.open().
+    */
+    public void apply() throws IOException
+    {
+        Row row = new Row(table_, key_);
+        apply(row);
+    }
+
+    /*
+     * Allows RowMutationVerbHandler to optimize by re-using a single Row object.
+    */
+    void apply(Row emptyRow) throws IOException
+    {
+        assert emptyRow.getColumnFamilies().size() == 0;
+        Table table = Table.open(table_);
+        for (String cfName : modifications_.keySet())
+        {
+            assert table.isValidColumnFamily(cfName);
+            emptyRow.addColumnFamily(modifications_.get(cfName));
+        }
+        table.apply(emptyRow);
+    }
+
+    /*
+     * This is equivalent to calling commit. Applies the changes to
+     * to the table that is obtained by calling Table.open().
+    */
+    void applyBinary(Row emptyRow) throws IOException, ExecutionException, InterruptedException
+    {
+        assert emptyRow.getColumnFamilies().size() == 0;
+        Table table = Table.open(table_);
+        Set<String> cfNames = modifications_.keySet();
+        for (String cfName : cfNames)
+        {
+            assert table.isValidColumnFamily(cfName);
+            emptyRow.addColumnFamily(modifications_.get(cfName));
+        }
+        table.load(emptyRow);
+    }
+
+    public Message makeRowMutationMessage() throws IOException
+    {
+        return makeRowMutationMessage(StorageService.mutationVerbHandler_);
+    }
+
+    public Message makeRowMutationMessage(String verbHandlerName) throws IOException
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+        serializer().serialize(this, dos);
+        EndPoint local = StorageService.getLocalStorageEndPoint();
+        EndPoint from = (local != null) ? local : new EndPoint(FBUtilities.getHostAddress(), 7000);
+        return new Message(from, StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
+    }
+
+    public static RowMutation getRowMutation(String table, BatchMutation batchMutation)
+    {
+        RowMutation rm = new RowMutation(table, batchMutation.key.trim());
+        for (String cfname : batchMutation.cfmap.keySet())
+        {
+            List<org.apache.cassandra.service.Column> list = batchMutation.cfmap.get(cfname);
+            for (org.apache.cassandra.service.Column column : list)
+            {
+                rm.add(new QueryPath(cfname, null, column.name), column.value, column.timestamp);
+            }
+        }
+        return rm;
+    }
+
+    public static RowMutation getRowMutation(String table, BatchMutationSuper batchMutationSuper) throws InvalidRequestException
+    {
+        RowMutation rm = new RowMutation(table, batchMutationSuper.key.trim());
+        for (String cfName : batchMutationSuper.cfmap.keySet())
+        {
+            for (org.apache.cassandra.service.SuperColumn super_column : batchMutationSuper.cfmap.get(cfName))
+            {
+                for (org.apache.cassandra.service.Column column : super_column.columns)
+                {
+                    try
+                    {
+                        rm.add(new QueryPath(cfName, super_column.name, column.name), column.value, column.timestamp);
+                    }
+                    catch (MarshalException e)
+                    {
+                        throw new InvalidRequestException(e.getMessage());
+                    }
+                }
+            }
+        }
+        return rm;
+    }
+
+    public String toString()
+    {
+        return "RowMutation(" +
+               "table='" + table_ + '\'' +
+               ", key='" + key_ + '\'' +
+               ", modifications=[" + StringUtils.join(modifications_.values(), ", ") + "]" +
+               ')';
+    }
+}
+
+class RowMutationSerializer implements ICompactSerializer<RowMutation>
+{
+    private void freezeTheMaps(Map<String, ColumnFamily> map, DataOutputStream dos) throws IOException
+    {
+        int size = map.size();
+        dos.writeInt(size);
+        if (size > 0)
+        {
+            Set<String> keys = map.keySet();
+            for (String key : keys)
+            {
+                dos.writeUTF(key);
+                ColumnFamily cf = map.get(key);
+                if (cf != null)
+                {
+                    ColumnFamily.serializer().serialize(cf, dos);
+                }
+            }
+        }
+    }
+
+    public void serialize(RowMutation rm, DataOutputStream dos) throws IOException
+    {
+        dos.writeUTF(rm.table());
+        dos.writeUTF(rm.key());
+
+        /* serialize the modifications_ in the mutation */
+        freezeTheMaps(rm.modifications_, dos);
+    }
+
+    private Map<String, ColumnFamily> defreezeTheMaps(DataInputStream dis) throws IOException
+    {
+        Map<String, ColumnFamily> map = new HashMap<String, ColumnFamily>();
+        int size = dis.readInt();
+        for (int i = 0; i < size; ++i)
+        {
+            String key = dis.readUTF();
+            ColumnFamily cf = ColumnFamily.serializer().deserialize(dis);
+            map.put(key, cf);
+        }
+        return map;
+    }
+
+    public RowMutation deserialize(DataInputStream dis) throws IOException
+    {
+        String table = dis.readUTF();
+        String key = dis.readUTF();
+        Map<String, ColumnFamily> modifications = defreezeTheMaps(dis);
+        return new RowMutation(table, key, modifications);
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java Thu Jul 30 15:30:21 2009
@@ -1,95 +1,95 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
-import javax.xml.bind.annotation.XmlElement;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-
-
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class RowMutationMessage implements Serializable
-{   
-    public static final String hint_ = "HINT";
-    private static RowMutationMessageSerializer serializer_ = new RowMutationMessageSerializer();
-	
-    static RowMutationMessageSerializer serializer()
-    {
-        return serializer_;
-    }
-
-    public Message makeRowMutationMessage() throws IOException
-    {         
-        return makeRowMutationMessage(StorageService.mutationVerbHandler_);
-    }
-    
-    public Message makeRowMutationMessage(String verbHandlerName) throws IOException
-    {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream( bos );
-        RowMutationMessage.serializer().serialize(this, dos);
-        EndPoint local = StorageService.getLocalStorageEndPoint();
-        EndPoint from = ( local != null ) ? local : new EndPoint(FBUtilities.getHostAddress(), 7000);
-        return new Message(from, StorageService.mutationStage_, verbHandlerName, bos.toByteArray());         
-    }
-    
-    @XmlElement(name="RowMutation")
-    private RowMutation rowMutation_;
-    
-    private RowMutationMessage()
-    {}
-    
-    public RowMutationMessage(RowMutation rowMutation)
-    {
-        rowMutation_ = rowMutation;
-    }
-    
-   public RowMutation getRowMutation()
-   {
-       return rowMutation_;
-   }
-}
-
-class RowMutationMessageSerializer implements ICompactSerializer<RowMutationMessage>
-{
-	public void serialize(RowMutationMessage rm, DataOutputStream dos) throws IOException
-	{
-		RowMutation.serializer().serialize(rm.getRowMutation(), dos);
-	}
-	
-    public RowMutationMessage deserialize(DataInputStream dis) throws IOException
-    {
-    	RowMutation rm = RowMutation.serializer().deserialize(dis);
-    	return new RowMutationMessage(rm);
-    }
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class RowMutationMessage implements Serializable
+{   
+    public static final String hint_ = "HINT";
+    private static RowMutationMessageSerializer serializer_ = new RowMutationMessageSerializer();
+	
+    static RowMutationMessageSerializer serializer()
+    {
+        return serializer_;
+    }
+
+    public Message makeRowMutationMessage() throws IOException
+    {         
+        return makeRowMutationMessage(StorageService.mutationVerbHandler_);
+    }
+    
+    public Message makeRowMutationMessage(String verbHandlerName) throws IOException
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream( bos );
+        RowMutationMessage.serializer().serialize(this, dos);
+        EndPoint local = StorageService.getLocalStorageEndPoint();
+        EndPoint from = ( local != null ) ? local : new EndPoint(FBUtilities.getHostAddress(), 7000);
+        return new Message(from, StorageService.mutationStage_, verbHandlerName, bos.toByteArray());         
+    }
+    
+    @XmlElement(name="RowMutation")
+    private RowMutation rowMutation_;
+    
+    private RowMutationMessage()
+    {}
+    
+    public RowMutationMessage(RowMutation rowMutation)
+    {
+        rowMutation_ = rowMutation;
+    }
+    
+   public RowMutation getRowMutation()
+   {
+       return rowMutation_;
+   }
+}
+
+class RowMutationMessageSerializer implements ICompactSerializer<RowMutationMessage>
+{
+	public void serialize(RowMutationMessage rm, DataOutputStream dos) throws IOException
+	{
+		RowMutation.serializer().serialize(rm.getRowMutation(), dos);
+	}
+	
+    public RowMutationMessage deserialize(DataInputStream dis) throws IOException
+    {
+    	RowMutation rm = RowMutation.serializer().deserialize(dis);
+    	return new RowMutationMessage(rm);
+    }
 }
\ No newline at end of file

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Thu Jul 30 15:30:21 2009
@@ -1,96 +1,96 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.*;
-
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.net.*;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class RowMutationVerbHandler implements IVerbHandler
-{
-    protected static class RowMutationContext
-    {
-        protected Row row_ = new Row();
-        protected DataInputBuffer buffer_ = new DataInputBuffer();
-    }
-
-    private static Logger logger_ = Logger.getLogger(RowMutationVerbHandler.class);
-    /* We use this so that we can reuse the same row mutation context for the mutation. */
-    private static ThreadLocal<RowMutationContext> tls_ = new InheritableThreadLocal<RowMutationContext>();
-
-    public void doVerb(Message message)
-    {
-        byte[] bytes = message.getMessageBody();
-        /* Obtain a Row Mutation Context from TLS */
-        RowMutationContext rowMutationCtx = tls_.get();
-        if ( rowMutationCtx == null )
-        {
-            rowMutationCtx = new RowMutationContext();
-            tls_.set(rowMutationCtx);
-        }
-
-        rowMutationCtx.buffer_.reset(bytes, bytes.length);
-
-        try
-        {
-            RowMutation rm = RowMutation.serializer().deserialize(rowMutationCtx.buffer_);
-            if (logger_.isDebugEnabled())
-              logger_.debug("Applying " + rm);
-
-            /* Check if there were any hints in this message */
-            byte[] hintedBytes = message.getHeader(RowMutation.HINT);
-            if ( hintedBytes != null && hintedBytes.length > 0 )
-            {
-            	EndPoint hint = EndPoint.fromBytes(hintedBytes);
-                if (logger_.isDebugEnabled())
-                  logger_.debug("Adding hint for " + hint);
-                /* add necessary hints to this mutation */
-                RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, rm.table());
-                hintedMutation.addHints(rm.key(), hint.getHost());
-                hintedMutation.apply();
-            }
-
-            rowMutationCtx.row_.clear();
-            rowMutationCtx.row_.setTable(rm.table());
-            rowMutationCtx.row_.setKey(rm.key());
-            rm.apply(rowMutationCtx.row_);
-
-            WriteResponse response = new WriteResponse(rm.table(), rm.key(), true);
-            Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
-            if (logger_.isDebugEnabled())
-              logger_.debug(rm + " applied.  Sending response to " + message.getMessageId() + "@" + message.getFrom());
-            MessagingService.getMessagingInstance().sendOneWay(responseMessage, message.getFrom());
-        }
-        catch (IOException e)
-        {
-            logger_.error("Error in row mutation", e);
-        }
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.*;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.net.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class RowMutationVerbHandler implements IVerbHandler
+{
+    protected static class RowMutationContext
+    {
+        protected Row row_ = new Row();
+        protected DataInputBuffer buffer_ = new DataInputBuffer();
+    }
+
+    private static Logger logger_ = Logger.getLogger(RowMutationVerbHandler.class);
+    /* We use this so that we can reuse the same row mutation context for the mutation. */
+    private static ThreadLocal<RowMutationContext> tls_ = new InheritableThreadLocal<RowMutationContext>();
+
+    public void doVerb(Message message)
+    {
+        byte[] bytes = message.getMessageBody();
+        /* Obtain a Row Mutation Context from TLS */
+        RowMutationContext rowMutationCtx = tls_.get();
+        if ( rowMutationCtx == null )
+        {
+            rowMutationCtx = new RowMutationContext();
+            tls_.set(rowMutationCtx);
+        }
+
+        rowMutationCtx.buffer_.reset(bytes, bytes.length);
+
+        try
+        {
+            RowMutation rm = RowMutation.serializer().deserialize(rowMutationCtx.buffer_);
+            if (logger_.isDebugEnabled())
+              logger_.debug("Applying " + rm);
+
+            /* Check if there were any hints in this message */
+            byte[] hintedBytes = message.getHeader(RowMutation.HINT);
+            if ( hintedBytes != null && hintedBytes.length > 0 )
+            {
+            	EndPoint hint = EndPoint.fromBytes(hintedBytes);
+                if (logger_.isDebugEnabled())
+                  logger_.debug("Adding hint for " + hint);
+                /* add necessary hints to this mutation */
+                RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, rm.table());
+                hintedMutation.addHints(rm.key(), hint.getHost());
+                hintedMutation.apply();
+            }
+
+            rowMutationCtx.row_.clear();
+            rowMutationCtx.row_.setTable(rm.table());
+            rowMutationCtx.row_.setKey(rm.key());
+            rm.apply(rowMutationCtx.row_);
+
+            WriteResponse response = new WriteResponse(rm.table(), rm.key(), true);
+            Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
+            if (logger_.isDebugEnabled())
+              logger_.debug(rm + " applied.  Sending response to " + message.getMessageId() + "@" + message.getFrom());
+            MessagingService.getMessagingInstance().sendOneWay(responseMessage, message.getFrom());
+        }
+        catch (IOException e)
+        {
+            logger_.error("Error in row mutation", e);
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Scanner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Scanner.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Scanner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Scanner.java Thu Jul 30 15:30:21 2009
@@ -1,87 +1,87 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.util.*;
-import java.io.IOException;
-
-
-/**
- * This class is used to loop through a retrieved column family
- * to get all columns in Iterator style. Usage is as follows:
- * Scanner scanner = new Scanner("table");
- * scanner.fetchColumnfamily(key, "column-family");
- * 
- * while ( scanner.hasNext() )
- * {
- *     Column column = scanner.next();
- *     // Do something with the column
- * }
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class Scanner implements IScanner<IColumn>
-{
-    /* Table over which we are scanning. */
-    private String table_; 
-    /* Iterator when iterating over the columns of a given key in a column family */
-    private Iterator<IColumn> columnIt_;
-        
-    public Scanner(String table)
-    {
-        table_ = table;
-    }
-    
-    /**
-     * Fetch the columns associated with this key for the specified column family.
-     * This method basically sets up an iterator internally and then provides an 
-     * iterator like interface to iterate over the columns.
-     * @param key key we are interested in.
-     * @param cf column family we are interested in.
-     * @throws IOException
-     */
-    public void fetch(String key, String cf) throws IOException
-    {        
-        if ( cf != null )
-        {
-            Table table = Table.open(table_);
-            ColumnFamily columnFamily = table.get(key, cf);
-            if ( columnFamily != null )
-            {
-                Collection<IColumn> columns = columnFamily.getSortedColumns();
-                columnIt_ = columns.iterator();
-            }
-        }
-    }        
-    
-    public boolean hasNext() throws IOException
-    {
-        return columnIt_.hasNext();
-    }
-    
-    public IColumn next()
-    {
-        return columnIt_.next();
-    }
-    
-    public void close() throws IOException
-    {
-        throw new UnsupportedOperationException("This operation is not supported in the Scanner");
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.*;
+import java.io.IOException;
+
+
+/**
+ * This class is used to loop through a retrieved column family
+ * to get all columns in Iterator style. Usage is as follows:
+ * Scanner scanner = new Scanner("table");
+ * scanner.fetchColumnfamily(key, "column-family");
+ * 
+ * while ( scanner.hasNext() )
+ * {
+ *     Column column = scanner.next();
+ *     // Do something with the column
+ * }
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Scanner implements IScanner<IColumn>
+{
+    /* Table over which we are scanning. */
+    private String table_; 
+    /* Iterator when iterating over the columns of a given key in a column family */
+    private Iterator<IColumn> columnIt_;
+        
+    public Scanner(String table)
+    {
+        table_ = table;
+    }
+    
+    /**
+     * Fetch the columns associated with this key for the specified column family.
+     * This method basically sets up an iterator internally and then provides an 
+     * iterator like interface to iterate over the columns.
+     * @param key key we are interested in.
+     * @param cf column family we are interested in.
+     * @throws IOException
+     */
+    public void fetch(String key, String cf) throws IOException
+    {        
+        if ( cf != null )
+        {
+            Table table = Table.open(table_);
+            ColumnFamily columnFamily = table.get(key, cf);
+            if ( columnFamily != null )
+            {
+                Collection<IColumn> columns = columnFamily.getSortedColumns();
+                columnIt_ = columns.iterator();
+            }
+        }
+    }        
+    
+    public boolean hasNext() throws IOException
+    {
+        return columnIt_.hasNext();
+    }
+    
+    public IColumn next()
+    {
+        return columnIt_.next();
+    }
+    
+    public void close() throws IOException
+    {
+        throw new UnsupportedOperationException("This operation is not supported in the Scanner");
+    }
+}