You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/30 21:39:58 UTC

svn commit: r820417 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ service/ utils/

Author: jbellis
Date: Wed Sep 30 19:39:57 2009
New Revision: 820417

URL: http://svn.apache.org/viewvc?rev=820417&view=rev
Log:
formatting + cleanup.  patch by jbellis for CASSANDRA-462

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Wed Sep 30 19:39:57 2009
@@ -23,6 +23,7 @@
 import java.security.MessageDigest;
 import java.io.IOException;
 
+import org.apache.log4j.Logger;
 import org.apache.commons.lang.ArrayUtils;
 
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -37,6 +38,8 @@
 
 public final class Column implements IColumn
 {
+    private static Logger logger_ = Logger.getLogger(Column.class);
+
     private static ColumnSerializer serializer_ = new ColumnSerializer();
 
     public static ColumnSerializer serializer()

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java Wed Sep 30 19:39:57 2009
@@ -115,20 +115,12 @@
         boolean isDigest = dis.readBoolean();
         
         Row row = null;
-        if ( !isDigest )
+        if (!isDigest)
         {
             row = Row.serializer().deserialize(dis);
         }
-		
-		ReadResponse rmsg = null;
-    	if( isDigest  )
-        {
-    		rmsg =  new ReadResponse(digest);
-        }
-    	else
-        {
-    		rmsg =  new ReadResponse(row);
-        }
+
+        ReadResponse rmsg = isDigest ? new ReadResponse(digest) : new ReadResponse(row);
         rmsg.setIsDigestQuery(isDigest);
     	return rmsg;
     } 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Wed Sep 30 19:39:57 2009
@@ -75,9 +75,8 @@
             }
             ReadCommand readCommand = ReadCommand.serializer().deserialize(readCtx.bufIn_);
             Table table = Table.open(readCommand.table);
-            Row row = null;
-            row = readCommand.getRow(table);
-            ReadResponse readResponse = null;
+            Row row = readCommand.getRow(table);
+            ReadResponse readResponse;
             if (readCommand.isDigestQuery())
             {
                 readResponse = new ReadResponse(row.digest());

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Wed Sep 30 19:39:57 2009
@@ -29,13 +29,11 @@
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.utils.FBUtilities;
 
 public class Row
 {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java Wed Sep 30 19:39:57 2009
@@ -23,7 +23,6 @@
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Row;
@@ -129,21 +128,17 @@
 		
 		public void callMe(String key, String value)
 		{
-			handleResponses();
-		}
-		
-		private void handleResponses()
-		{
-			try
+            try
 			{
 				readResponseResolver_.resolve(new ArrayList<Message>(responses_));
-			}
-			catch ( DigestMismatchException ex )
-			{
-				throw new RuntimeException(ex);
-			}
-		}
-	}
+            }
+            catch (Exception ex)
+            {
+                throw new RuntimeException(ex);
+            }
+        }
+
+    }
 
 	private static long scheduledTimeMillis_ = 600;
 	private static ICachetable<String, String> readRepairTable_ = new Cachetable<String, String>(scheduledTimeMillis_);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java Wed Sep 30 19:39:57 2009
@@ -19,6 +19,7 @@
 package org.apache.cassandra.service;
 
 import java.util.List;
+import java.io.IOException;
 
 import org.apache.cassandra.net.Message;
 
@@ -32,7 +33,7 @@
 	 * repairs . Hence you need to derive a response resolver based on your
 	 * needs from this interface.
 	 */
-	public T resolve(List<Message> responses) throws DigestMismatchException;
+	public T resolve(List<Message> responses) throws DigestMismatchException, IOException;
 	public boolean isDataPresent(List<Message> responses);
 
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Wed Sep 30 19:39:57 2009
@@ -24,6 +24,7 @@
 import java.util.concurrent.locks.*;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.io.IOException;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.IAsyncCallback;
@@ -55,64 +56,67 @@
         startTime_ = System.currentTimeMillis();
     }
     
-    public T get() throws TimeoutException, DigestMismatchException
+    public T get() throws TimeoutException, DigestMismatchException, IOException
     {
-    	lock_.lock();
+        lock_.lock();
         try
-        {            
-            boolean bVal = true;            
+        {
+            boolean bVal = true;
             try
             {
-            	if ( !done_.get() )
+                if (!done_.get())
                 {
                     long timeout = System.currentTimeMillis() - startTime_ + DatabaseDescriptor.getRpcTimeout();
-                    if(timeout > 0)
+                    if (timeout > 0)
+                    {
                         bVal = condition_.await(timeout, TimeUnit.MILLISECONDS);
+                    }
                     else
+                    {
                         bVal = false;
+                    }
                 }
             }
-            catch ( InterruptedException ex )
+            catch (InterruptedException ex)
             {
-                if (logger_.isDebugEnabled())
-                  logger_.debug( LogUtil.throwableToString(ex) );
+                throw new AssertionError(ex);
             }
-            
-            if ( !bVal && !done_.get() )
+
+            if (!bVal && !done_.get())
             {
                 StringBuilder sb = new StringBuilder("");
-                for ( Message message : responses_ )
+                for (Message message : responses_)
                 {
-                    sb.append(message.getFrom());                    
-                }                
-                throw new TimeoutException("Operation timed out - received only " +  responses_.size() + " responses from " + sb.toString() + " .");
+                    sb.append(message.getFrom());
+                }
+                throw new TimeoutException("Operation timed out - received only " + responses_.size() + " responses from " + sb.toString() + " .");
             }
         }
         finally
         {
             lock_.unlock();
-            for(Message response : responses_)
+            for (Message response : responses_)
             {
-            	MessagingService.removeRegisteredCallback( response.getMessageId() );
+                MessagingService.removeRegisteredCallback(response.getMessageId());
             }
         }
 
-    	return responseResolver_.resolve( responses_);
+        return responseResolver_.resolve(responses_);
     }
     
     public void response(Message message)
     {
         lock_.lock();
         try
-        {            
-            if ( !done_.get() )
+        {
+            if (!done_.get())
             {
-            	responses_.add( message );
-            	if ( responses_.size() >= responseCount_ && responseResolver_.isDataPresent(responses_))
-            	{
-            		done_.set(true);
-            		condition_.signal();            	
-            	}
+                responses_.add(message);
+                if (responses_.size() >= responseCount_ && responseResolver_.isDataPresent(responses_))
+                {
+                    done_.set(true);
+                    condition_.signal();
+                }
             }
         }
         finally

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Wed Sep 30 19:39:57 2009
@@ -54,8 +54,8 @@
 	 * repair request should be scheduled.
 	 * 
 	 */
-	public Row resolve(List<Message> responses) throws DigestMismatchException
-	{
+	public Row resolve(List<Message> responses) throws DigestMismatchException, IOException
+    {
         long startTime = System.currentTimeMillis();
 		Row retRow = null;
 		List<Row> rowList = new ArrayList<Row>();
@@ -76,38 +76,31 @@
 		{					            
             byte[] body = response.getMessageBody();
             bufIn.reset(body, body.length);
-            try
+            long start = System.currentTimeMillis();
+            ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
+            if (logger_.isDebugEnabled())
+              logger_.debug( "Response deserialization time : " + (System.currentTimeMillis() - start) + " ms.");
+            if (result.isDigestQuery())
             {
-                long start = System.currentTimeMillis();
-                ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
-                if (logger_.isDebugEnabled())
-                  logger_.debug( "Response deserialization time : " + (System.currentTimeMillis() - start) + " ms.");
-    			if(!result.isDigestQuery())
-    			{
-    				rowList.add(result.row());
-    				endPoints.add(response.getFrom());
-    				key = result.row().key();
-    				table = result.row().getTable();
-    			}
-    			else
-    			{
-    				digest = result.digest();
-    				isDigestQuery = true;
-    			}
+                digest = result.digest();
+                isDigestQuery = true;
             }
-            catch( IOException ex )
+            else
             {
-                logger_.info(LogUtil.throwableToString(ex));
+                rowList.add(result.row());
+                endPoints.add(response.getFrom());
+                key = result.row().key();
+                table = result.row().getTable();
             }
-		}
+        }
 		// If there was a digest query compare it with all the data digests 
 		// If there is a mismatch then throw an exception so that read repair can happen.
-		if(isDigestQuery)
-		{
-			for(Row row: rowList)
-			{
-				if( !Arrays.equals(row.digest(), digest) )
-				{
+        if (isDigestQuery)
+        {
+            for (Row row : rowList)
+            {
+                if (!Arrays.equals(row.digest(), digest))
+                {
                     /* Wrap the key as the context in this exception */
 					throw new DigestMismatchException(row.key());
 				}
@@ -115,36 +108,36 @@
 		}
 		
         /* If the rowList is empty then we had some exception above. */
-        if ( rowList.size() == 0 )
+        if (rowList.size() == 0)
         {
             return retRow;
         }
-        
+
         /* Now calculate the resolved row */
-		retRow = new Row(table, key);
-		for (int i = 0 ; i < rowList.size(); i++)
-		{
-			retRow.repair(rowList.get(i));			
-		}
+        retRow = new Row(table, key);
+        for (int i = 0; i < rowList.size(); i++)
+        {
+            retRow.repair(rowList.get(i));
+        }
 
         // At  this point  we have the return row .
-		// Now we need to calculate the difference 
-		// so that we can schedule read repairs 
-		for (int i = 0 ; i < rowList.size(); i++)
-		{
-			// since retRow is the resolved row it can be used as the super set
-			Row diffRow = rowList.get(i).diff(retRow);
-			if(diffRow == null) // no repair needs to happen
-				continue;
-			// create the row mutation message based on the diff and schedule a read repair 
-			RowMutation rowMutation = new RowMutation(table, key);            			
-	        for (ColumnFamily cf : diffRow.getColumnFamilies())
-	        {
-	            rowMutation.add(cf);
-	        }
+        // Now we need to calculate the difference
+        // so that we can schedule read repairs
+        for (int i = 0; i < rowList.size(); i++)
+        {
+            // since retRow is the resolved row it can be used as the super set
+            Row diffRow = rowList.get(i).diff(retRow);
+            if (diffRow == null) // no repair needs to happen
+                continue;
+            // create the row mutation message based on the diff and schedule a read repair
+            RowMutation rowMutation = new RowMutation(table, key);
+            for (ColumnFamily cf : diffRow.getColumnFamilies())
+            {
+                rowMutation.add(cf);
+            }
             RowMutationMessage rowMutationMessage = new RowMutationMessage(rowMutation);
-	        ReadRepairManager.instance().schedule(endPoints.get(i),rowMutationMessage);
-		}
+            ReadRepairManager.instance().schedule(endPoints.get(i), rowMutationMessage);
+        }
         if (logger_.isDebugEnabled())
             logger_.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
 		return retRow;
@@ -152,26 +145,26 @@
 
 	public boolean isDataPresent(List<Message> responses)
 	{
-		boolean isDataPresent = false;
-		for (Message response : responses)
-		{
+        boolean isDataPresent = false;
+        for (Message response : responses)
+        {
             byte[] body = response.getMessageBody();
-			DataInputBuffer bufIn = new DataInputBuffer();
+            DataInputBuffer bufIn = new DataInputBuffer();
             bufIn.reset(body, body.length);
             try
             {
-    			ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
-    			if(!result.isDigestQuery())
-    			{
-    				isDataPresent = true;
-    			}
+                ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
+                if (!result.isDigestQuery())
+                {
+                    isDataPresent = true;
+                }
                 bufIn.close();
             }
-            catch(IOException ex)
+            catch (IOException ex)
             {
                 logger_.info(LogUtil.throwableToString(ex));
-            }                        
-		}
-		return isDataPresent;
-	}
+            }
+        }
+        return isDataPresent;
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Sep 30 19:39:57 2009
@@ -352,10 +352,7 @@
             Message message = command.makeReadMessage();
             Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
 
-            IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
-            QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(
-                    DatabaseDescriptor.getQuorum(),
-                    readResponseResolver);
+            QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new ReadResponseResolver());
             EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(command.key);
             List<EndPoint> endpointList = new ArrayList<EndPoint>(Arrays.asList(StorageService.instance().getReadStorageEndPoints(command.key)));
             /* Remove the local storage endpoint from the list. */
@@ -401,7 +398,7 @@
             }
             catch (DigestMismatchException ex)
             {
-                if ( DatabaseDescriptor.getConsistencyCheck())
+                if (DatabaseDescriptor.getConsistencyCheck())
                 {
                     IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver();
                     QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
@@ -409,8 +406,7 @@
                             readResponseResolverRepair);
                     logger.info("DigestMismatchException: " + command.key);
                     Message messageRepair = command.makeReadMessage();
-                    MessagingService.getMessagingInstance().sendRR(messageRepair, commandEndPoints.get(commandIndex),
-                            quorumResponseHandlerRepair);
+                    MessagingService.getMessagingInstance().sendRR(messageRepair, commandEndPoints.get(commandIndex), quorumResponseHandlerRepair);
                     try
                     {
                         row = quorumResponseHandlerRepair.get();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Wed Sep 30 19:39:57 2009
@@ -398,15 +398,12 @@
         return bytes;
     }
 
-    public static String bytesToHex(byte[] buf)
+    public static String bytesToHex(byte[] bytes)
     {
-        char[] chars = new char[2*buf.length];
-        for (int i = 0; i < buf.length; i++)
-        {
-            chars[i*2] = HEX_CHARS[(buf[i] & 0xF0) >>> 4];
-            chars[i*2+1] = HEX_CHARS[buf[i] & 0x0F];
-        }
-        return new String(chars);
+        StringBuilder sb = new StringBuilder();
+        for (byte b : bytes)
+            sb.append(Integer.toHexString(b & 0xff));
+        return sb.toString();
     }
 
     public static String mapToString(Map<?,?> map)