You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/08 16:45:45 UTC

svn commit: r897233 - /incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java

Author: jbellis
Date: Fri Jan  8 15:45:38 2010
New Revision: 897233

URL: http://svn.apache.org/viewvc?rev=897233&view=rev
Log:
add hint delivery paging.  patch by jbellis; tested by Brandon Williams for CASSANDRA-680

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=897233&r1=897232&r2=897233&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Fri Jan  8 15:45:38 2010
@@ -32,10 +32,15 @@
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 
 import java.net.InetAddress;
+
+import org.apache.commons.lang.ArrayUtils;
+
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.*;
@@ -84,6 +89,7 @@
     private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HINTED-HANDOFF-POOL");
     final Timer timer = new Timer("HINTED-HANDOFF-TIMER");
     public static final String HINTS_CF = "HintsColumnFamily";
+    private static final int PAGE_SIZE = 10000;
 
 
     public static HintedHandOffManager instance()
@@ -168,29 +174,36 @@
         // 7. I guess we are done
         for (String tableName : DatabaseDescriptor.getTables())
         {
-            ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(new IdentityQueryFilter(tableName, new QueryPath(HINTS_CF))), Integer.MAX_VALUE);
-            if (hintColumnFamily == null)
+            byte[] startColumn = ArrayUtils.EMPTY_BYTE_ARRAY;
+            while (true)
             {
-                continue;
-            }
-            Collection<IColumn> keys = hintColumnFamily.getSortedColumns();
+                QueryFilter filter = new SliceQueryFilter(tableName, new QueryPath(HINTS_CF), startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, false, PAGE_SIZE);
+                ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
+                if (hintColumnFamily == null)
+                {
+                    break;
+                }
+                Collection<IColumn> keys = hintColumnFamily.getSortedColumns();
 
-            for (IColumn keyColumn : keys)
-            {
-                Collection<IColumn> endpoints = keyColumn.getSubColumns();
-                String keyStr = new String(keyColumn.name(), "UTF-8");
-                int deleted = 0;
-                for (IColumn endpoint : endpoints)
+                for (IColumn keyColumn : keys)
                 {
-                    if (sendMessage(InetAddress.getByAddress(endpoint.name()), tableName, keyStr))
+                    Collection<IColumn> endpoints = keyColumn.getSubColumns();
+                    String keyStr = new String(keyColumn.name(), "UTF-8");
+                    int deleted = 0;
+                    for (IColumn endpoint : endpoints)
                     {
-                        deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), System.currentTimeMillis());
-                        deleted++;
+                        if (sendMessage(InetAddress.getByAddress(endpoint.name()), tableName, keyStr))
+                        {
+                            deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), System.currentTimeMillis());
+                            deleted++;
+                        }
                     }
-                }
-                if (deleted == endpoints.size())
-                {
-                    deleteHintKey(tableName, keyColumn.name());
+                    if (deleted == endpoints.size())
+                    {
+                        deleteHintKey(tableName, keyColumn.name());
+                    }
+
+                    startColumn = keyColumn.name(); // repeating the last as the first is fine since we just deleted it
                 }
             }
         }
@@ -218,33 +231,41 @@
         // 2. For each key read the list of recipients if the endpoint matches send
         // 3. Delete that recipient from the key if write was successful
         Table systemTable = Table.open(Table.SYSTEM_TABLE);
+        ColumnFamilyStore hintStore = systemTable.getColumnFamilyStore(HINTS_CF);
         for (String tableName : DatabaseDescriptor.getTables())
         {
-            ColumnFamily hintedColumnFamily = ColumnFamilyStore.removeDeleted(systemTable.get(tableName, HINTS_CF), Integer.MAX_VALUE);
-            if (hintedColumnFamily == null)
+            byte[] startColumn = ArrayUtils.EMPTY_BYTE_ARRAY;
+            while (true)
             {
-                continue;
-            }
-            Collection<IColumn> keys = hintedColumnFamily.getSortedColumns();
+                QueryFilter filter = new SliceQueryFilter(tableName, new QueryPath(HINTS_CF), startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, false, PAGE_SIZE);
+                ColumnFamily hintedColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
+                if (hintedColumnFamily == null)
+                {
+                    break;
+                }
+                Collection<IColumn> keys = hintedColumnFamily.getSortedColumns();
 
-            for (IColumn keyColumn : keys)
-            {
-                String keyStr = new String(keyColumn.name(), "UTF-8");
-                Collection<IColumn> endpoints = keyColumn.getSubColumns();
-                for (IColumn hintEndPoint : endpoints)
+                for (IColumn keyColumn : keys)
                 {
-                    if (Arrays.equals(hintEndPoint.name(), targetEPBytes) && sendMessage(endPoint, tableName, keyStr))
+                    String keyStr = new String(keyColumn.name(), "UTF-8");
+                    Collection<IColumn> endpoints = keyColumn.getSubColumns();
+                    for (IColumn hintEndPoint : endpoints)
                     {
-                        if (endpoints.size() == 1)
+                        if (Arrays.equals(hintEndPoint.name(), targetEPBytes) && sendMessage(endPoint, tableName, keyStr))
                         {
-                            deleteHintKey(tableName, keyColumn.name());
+                            if (endpoints.size() == 1)
+                            {
+                                deleteHintKey(tableName, keyColumn.name());
+                            }
+                            else
+                            {
+                                deleteEndPoint(hintEndPoint.name(), tableName, keyColumn.name(), System.currentTimeMillis());
+                            }
+                            break;
                         }
-                        else
-                        {
-                            deleteEndPoint(hintEndPoint.name(), tableName, keyColumn.name(), System.currentTimeMillis());
-                        }
-                        break;
                     }
+
+                    startColumn = keyColumn.name(); // repeating the last as the first is fine since we just deleted it
                 }
             }
         }