You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/08 16:46:27 UTC

svn commit: r897234 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/db: ColumnFamilyStore.java HintedHandOffManager.java

Author: jbellis
Date: Fri Jan  8 15:46:22 2010
New Revision: 897234

URL: http://svn.apache.org/viewvc?rev=897234&view=rev
Log:
cleanup of HHOM.  patch by jbellis; tested by Brandon Williams for CASSANDRA-680.

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=897234&r1=897233&r2=897234&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Jan  8 15:46:22 2010
@@ -245,12 +245,6 @@
             sstables.add(sstable);
         }
         ssTables_.onStart(sstables);
-
-        // schedule hinted handoff
-        if (table_.equals(Table.SYSTEM_TABLE) && columnFamily_.equals(HintedHandOffManager.HINTS_CF))
-        {
-            HintedHandOffManager.instance().scheduleHandoffsFor(this);
-        }
     }
 
     /*

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=897234&r1=897233&r2=897234&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Fri Jan  8 15:46:22 2010
@@ -86,11 +86,11 @@
     private static final Lock lock_ = new ReentrantLock();
     private static final Logger logger_ = Logger.getLogger(HintedHandOffManager.class);
     final static long INTERVAL_IN_MS = 3600 * 1000; // check for ability to deliver hints this often
-    private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HINTED-HANDOFF-POOL");
-    final Timer timer = new Timer("HINTED-HANDOFF-TIMER");
     public static final String HINTS_CF = "HintsColumnFamily";
     private static final int PAGE_SIZE = 10000;
 
+    private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HINTED-HANDOFF-POOL");
+
 
     public static HintedHandOffManager instance()
     {
@@ -110,6 +110,21 @@
         return instance_;
     }
 
+    public HintedHandOffManager()
+    {
+        new Thread(new WrappedRunnable()
+        {
+            public void runMayThrow() throws Exception
+            {
+                while (true)
+                {
+                    Thread.sleep(INTERVAL_IN_MS);
+                    deliverAllHints();
+                }
+            }
+        }).start();
+    }
+
     private static boolean sendMessage(InetAddress endPoint, String tableName, String key) throws IOException
     {
         if (!Gossiper.instance().isKnownEndpoint(endPoint))
@@ -160,7 +175,7 @@
     }
 
     /** hintStore must be the hints columnfamily from the system table */
-    private static void deliverAllHints(ColumnFamilyStore hintStore) throws DigestMismatchException, IOException, InvalidRequestException, TimeoutException
+    private static void deliverAllHints() throws DigestMismatchException, IOException, InvalidRequestException, TimeoutException
     {
         if (logger_.isDebugEnabled())
           logger_.debug("Started deliverAllHints");
@@ -172,6 +187,7 @@
         // 5. Now force a flush
         // 6. Do major compaction to clean up all deletes etc.
         // 7. I guess we are done
+        ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
         for (String tableName : DatabaseDescriptor.getTables())
         {
             byte[] startColumn = ArrayUtils.EMPTY_BYTE_ARRAY;
@@ -180,9 +196,7 @@
                 QueryFilter filter = new SliceQueryFilter(tableName, new QueryPath(HINTS_CF), startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, false, PAGE_SIZE);
                 ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
                 if (hintColumnFamily == null)
-                {
                     break;
-                }
                 Collection<IColumn> keys = hintColumnFamily.getSortedColumns();
 
                 for (IColumn keyColumn : keys)
@@ -230,20 +244,17 @@
         // 1. Scan through all the keys that we need to handoff
         // 2. For each key read the list of recipients if the endpoint matches send
         // 3. Delete that recipient from the key if write was successful
-        Table systemTable = Table.open(Table.SYSTEM_TABLE);
-        ColumnFamilyStore hintStore = systemTable.getColumnFamilyStore(HINTS_CF);
+        ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
         for (String tableName : DatabaseDescriptor.getTables())
         {
             byte[] startColumn = ArrayUtils.EMPTY_BYTE_ARRAY;
             while (true)
             {
                 QueryFilter filter = new SliceQueryFilter(tableName, new QueryPath(HINTS_CF), startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, false, PAGE_SIZE);
-                ColumnFamily hintedColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
-                if (hintedColumnFamily == null)
-                {
+                ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
+                if (hintColumnFamily == null)
                     break;
-                }
-                Collection<IColumn> keys = hintedColumnFamily.getSortedColumns();
+                Collection<IColumn> keys = hintColumnFamily.getSortedColumns();
 
                 for (IColumn keyColumn : keys)
                 {
@@ -254,13 +265,9 @@
                         if (Arrays.equals(hintEndPoint.name(), targetEPBytes) && sendMessage(endPoint, tableName, keyStr))
                         {
                             if (endpoints.size() == 1)
-                            {
                                 deleteHintKey(tableName, keyColumn.name());
-                            }
                             else
-                            {
                                 deleteEndPoint(hintEndPoint.name(), tableName, keyColumn.name(), System.currentTimeMillis());
-                            }
                             break;
                         }
                     }
@@ -274,25 +281,6 @@
           logger_.debug("Finished hinted handoff for endpoint " + endPoint);
     }
 
-    public void scheduleHandoffsFor(final ColumnFamilyStore columnFamilyStore)
-    {
-        final Runnable r = new WrappedRunnable()
-        {
-            public void runMayThrow() throws Exception
-            {
-                deliverAllHints(columnFamilyStore);
-            }
-        };
-        TimerTask task = new TimerTask()
-        {
-            public void run()
-            {
-                executor_.execute(r);
-            }
-        };
-        timer.schedule(task, INTERVAL_IN_MS, INTERVAL_IN_MS);
-    }
-
     /*
      * This method is used to deliver hints to a particular endpoint.
      * When we learn that some endpoint is back up we deliver the data