You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/05 19:03:05 UTC

svn commit: r771933 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/db: HintedHandOffManager.java MinorCompactionManager.java

Author: jbellis
Date: Tue May  5 17:03:04 2009
New Revision: 771933

URL: http://svn.apache.org/viewvc?rev=771933&view=rev
Log:
cleanup and refactor HintedHandOffManager.
patch by jbellis and Jun Rao for CASSANDRA-34

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=771933&r1=771932&r2=771933&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Tue May  5 17:03:04 2009
@@ -39,9 +39,14 @@
 
 
 /**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ * There are two ways hinted data gets delivered to the intended nodes.
+ *
+ * runHints() runs periodically and pushes the hinted data on this node to
+ * every intended node.
+ *
+ * runDelieverHints() is called when some other node starts up (potentially
+ * from a failure) and delivers the hinted data just to that node.
  */
-
 public class HintedHandOffManager implements IComponentShutdown
 {
     private static HintedHandOffManager instance_;
@@ -70,184 +75,153 @@
         return instance_;
     }
 
-    class HintedHandOff implements Runnable
+    private static boolean sendMessage(String endpointAddress, String key) throws DigestMismatchException, TimeoutException, IOException
     {
-        private ColumnFamilyStore columnFamilyStore_ = null;
-        private EndPoint endPoint_ = null;
-
-        HintedHandOff(ColumnFamilyStore columnFamilyStore)
-        {
-        	columnFamilyStore_ = columnFamilyStore;
-        }
-        HintedHandOff(EndPoint endPoint)
+        EndPoint endPoint = new EndPoint(endpointAddress, DatabaseDescriptor.getStoragePort());
+        if (!FailureDetector.instance().isAlive(endPoint))
         {
-        	endPoint_ = endPoint;
+            return false;
         }
 
-        private boolean sendMessage(String endpointAddress, String key) throws DigestMismatchException, TimeoutException, IOException
+        Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+        Row row = table.get(key);
+        RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), row);
+        Message message = rm.makeRowMutationMessage();
+
+        QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(1, new WriteResponseResolver());
+        MessagingService.getMessagingInstance().sendRR(message, new EndPoint[]{ endPoint }, quorumResponseHandler);
+        return quorumResponseHandler.get();
+    }
+
+    private static void deleteEndPoint(String endpointAddress, String key) throws Exception
+    {
+        RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
+        rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress, System.currentTimeMillis());
+        rm.apply();
+    }
+
+    private static void deleteKey(String key) throws Exception
+    {
+        // delete the hint record
+        RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
+        rm.delete(Table.hints_ + ":" + key, System.currentTimeMillis());
+        rm.apply();
+    }
+
+    private static void deliverAllHints(ColumnFamilyStore columnFamilyStore)
+    {
+        logger_.debug("Started hinted handoff of " + columnFamilyStore.columnFamily_);
+
+        // 1. Scan through all the keys that we need to handoff
+        // 2. For each key read the list of recepients and send
+        // 3. Delete that recepient from the key if write was successful
+        // 4. If all writes were success for a given key we can even delete the key .
+        // 5. Now force a flush
+        // 6. Do major compaction to clean up all deletes etc.
+        // 7. I guess we r done
+        Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+        ColumnFamily hintColumnFamily = null;
+        try
         {
-            EndPoint endPoint = new EndPoint(endpointAddress, DatabaseDescriptor.getStoragePort());
-            if (!FailureDetector.instance().isAlive(endPoint))
+            hintColumnFamily = table.get(key_, Table.hints_);
+            if (hintColumnFamily == null)
             {
-                return false;
+                columnFamilyStore.forceFlush();
+                return;
+            }
+            Collection<IColumn> keys = hintColumnFamily.getAllColumns();
+            if (keys == null)
+            {
+                return;
             }
 
-            Table table = Table.open(DatabaseDescriptor.getTables().get(0));
-            Row row = table.get(key);
-            RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), row);
-            Message message = rm.makeRowMutationMessage();
-
-            QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(1, new WriteResponseResolver());
-            MessagingService.getMessagingInstance().sendRR(message, new EndPoint[]{ endPoint }, quorumResponseHandler);
-            return quorumResponseHandler.get();
+            for (IColumn keyColumn : keys)
+            {
+                Collection<IColumn> endpoints = keyColumn.getSubColumns();
+                // endpoints could be null if the server were terminated during a previous runHints
+                // after deleteEndPoint but before deleteKey.
+                boolean allsuccess = true;
+                if (endpoints != null)
+                {
+                    for (IColumn endpoint : endpoints)
+                    {
+                        if (sendMessage(endpoint.name(), keyColumn.name()))
+                        {
+                            deleteEndPoint(endpoint.name(), keyColumn.name());
+                        }
+                        else
+                        {
+                            allsuccess = false;
+                        }
+                    }
+                }
+                if (allsuccess)
+                {
+                    deleteKey(keyColumn.name());
+                }
+            }
+            columnFamilyStore.forceFlush();
+            columnFamilyStore.forceCompaction(null, null, 0, null);
         }
-
-        private void deleteEndPoint(String endpointAddress, String key) throws Exception
+        catch (Exception ex)
         {
-        	RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
-        	rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress, System.currentTimeMillis());
-        	rm.apply();
+            logger_.error(ex.getMessage());
         }
-
-        private void deleteKey(String key) throws Exception
+        finally
         {
-        	RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
-        	rm.delete(Table.hints_ + ":" + key, System.currentTimeMillis());
-        	rm.apply();
+            logger_.debug("Finished hinted handoff of " + columnFamilyStore.columnFamily_);
         }
+    }
 
-        private void runHints()
-        {
-            logger_.debug("Started  hinted handoff " + columnFamilyStore_.columnFamily_);
+    private static void deliverHintsToEndpoint(EndPoint endPoint)
+    {
+        logger_.debug("Started hinted handoff for endPoint " + endPoint.getHost());
 
-            // 1. Scan through all the keys that we need to handoff
-            // 2. For each key read the list of recepients and send
-            // 3. Delete that recepient from the key if write was successful
-            // 4. If all writes were success for a given key we can even delete the key .
-            // 5. Now force a flush
-            // 6. Do major compaction to clean up all deletes etc.
-            // 7. I guess we r done
-            Table table =  Table.open(DatabaseDescriptor.getTables().get(0));
-            ColumnFamily hintedColumnFamily = null;
-            boolean success = false;
-            boolean allsuccess = true;
-            try
+        // 1. Scan through all the keys that we need to handoff
+        // 2. For each key read the list of recepients if teh endpoint matches send
+        // 3. Delete that recepient from the key if write was successful
+        Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+        ColumnFamily hintedColumnFamily = null;
+        try
+        {
+            hintedColumnFamily = table.get(key_, Table.hints_);
+            if (hintedColumnFamily == null)
             {
-            	hintedColumnFamily = table.get(key_, Table.hints_);
-            	if(hintedColumnFamily == null)
-            	{
-                    // Force flush now
-                    columnFamilyStore_.forceFlush();
-            		return;
-            	}
-            	Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
-            	if(keys != null)
-            	{
-                	for(IColumn key : keys)
-                	{
-                		// Get all the endpoints for teh key
-                		Collection<IColumn> endpoints =  key.getSubColumns();
-                		allsuccess = true;
-                		if ( endpoints != null )
-                		{
-                			for(IColumn endpoint : endpoints )
-                			{
-                				success = sendMessage(endpoint.name(), key.name());
-                				if(success)
-                				{
-                					// Delete the endpoint from the list
-                					deleteEndPoint(endpoint.name(), key.name());
-                				}
-                				else
-                				{
-                					allsuccess = false;
-                				}
-                			}
-                		}
-                		if(endpoints == null || allsuccess)
-                		{
-                			// Delete the key itself.
-                			deleteKey(key.name());
-                		}
-                	}
-            	}
-                // Force flush now
-                columnFamilyStore_.forceFlush();
-
-                // Now do a major compaction
-                columnFamilyStore_.forceCompaction(null, null, 0, null);
-            }
-            catch ( Exception ex)
-            {
-            	logger_.warn(ex.getMessage());
-            }
-            logger_.debug("Finished hinted handoff ..."+columnFamilyStore_.columnFamily_);
-        }
-
-        private void runDeliverHints(EndPoint to)
-        {
-            logger_.debug("Started  hinted handoff for endPoint " + endPoint_.getHost());
-
-            // 1. Scan through all the keys that we need to handoff
-            // 2. For each key read the list of recepients if teh endpoint matches send
-            // 3. Delete that recepient from the key if write was successful
-
-            Table table =  Table.open(DatabaseDescriptor.getTables().get(0));
-            ColumnFamily hintedColumnFamily = null;
-            boolean success = false;
-            try
+                return;
+            }
+            Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
+            if (keys == null)
             {
-            	hintedColumnFamily = table.get(key_, Table.hints_);
-            	if(hintedColumnFamily == null)
-            		return;
-            	Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
-            	if(keys != null)
-            	{
-                	for(IColumn key : keys)
-                	{
-                		// Get all the endpoints for teh key
-                		Collection<IColumn> endpoints =  key.getSubColumns();
-                		if ( endpoints != null )
-                		{
-                			for(IColumn endpoint : endpoints )
-                			{
-                				if(endpoint.name().equals(endPoint_.getHost()))
-                				{
-	                				success = sendMessage(endpoint.name(), key.name());
-	                				if(success)
-	                				{
-	                					// Delete the endpoint from the list
-	                					deleteEndPoint(endpoint.name(), key.name());
-	                				}
-                				}
-                			}
-                		}
-                		if(endpoints == null)
-                		{
-                			// Delete the key itself.
-                			deleteKey(key.name());
-                		}
-                	}
-            	}
-            }
-            catch ( Exception ex)
-            {
-            	logger_.warn(ex.getMessage());
-            }
-            logger_.debug("Finished hinted handoff for endpoint ..." + endPoint_.getHost());
-        }
-
-        public void run()
-        {
-        	if(endPoint_ == null)
-        	{
-        		runHints();
-        	}
-        	else
-        	{
-        		runDeliverHints(endPoint_);
-        	}
+                return;
+            }
 
+            for (IColumn keyColumn : keys)
+            {
+                Collection<IColumn> endpoints = keyColumn.getSubColumns();
+                if (endpoints == null)
+                {
+                    deleteKey(keyColumn.name());
+                    continue;
+                }
+                for (IColumn endpoint : endpoints)
+                {
+                    if (endpoint.name().equals(endPoint.getHost()))
+                    {
+                        if (sendMessage(endpoint.name(), keyColumn.name()))
+                        {
+                            deleteEndPoint(endpoint.name(), keyColumn.name());
+                        }
+                    }
+                }
+            }
+        }
+        catch (Exception ex)
+        {
+            logger_.error(ex.getMessage());
+        }
+        finally
+        {
+            logger_.debug("Finished hinted handoff for endpoint " + endPoint.getHost());
         }
     }
 
@@ -256,10 +230,16 @@
     	StorageService.instance().registerComponentForShutdown(this);
     }
 
-    public void submit(ColumnFamilyStore columnFamilyStore)
+    public void submit(final ColumnFamilyStore columnFamilyStore)
     {
-    	executor_.scheduleWithFixedDelay(new HintedHandOff(columnFamilyStore), HintedHandOffManager.intervalInMins_,
-    			HintedHandOffManager.intervalInMins_, TimeUnit.MINUTES);
+        Runnable r = new Runnable()
+        {
+            public void run()
+            {
+                deliverAllHints(columnFamilyStore);
+            }
+        };
+    	executor_.scheduleWithFixedDelay(r, HintedHandOffManager.intervalInMins_, HintedHandOffManager.intervalInMins_, TimeUnit.MINUTES);
     }
 
     /*
@@ -267,9 +247,16 @@
      * When we learn that some endpoint is back up we deliver the data
      * to him via an event driven mechanism.
     */
-    public void deliverHints(EndPoint to)
+    public void deliverHints(final EndPoint to)
     {
-    	executor_.submit(new HintedHandOff(to));
+        Runnable r = new Runnable()
+        {
+            public void run()
+            {
+                deliverHintsToEndpoint(to);
+            }
+        };
+    	executor_.submit(r);
     }
 
     public void shutdown()

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=771933&r1=771932&r2=771933&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java Tue May  5 17:03:04 2009
@@ -31,15 +31,11 @@
 
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
-import org.apache.cassandra.db.HintedHandOffManager.HintedHandOff;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.service.IComponentShutdown;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.LogUtil;
 import org.apache.log4j.Logger;
-import org.apache.cassandra.concurrent.*;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )