You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/05 19:03:17 UTC

svn commit: r771934 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/db: ColumnFamilyStore.java HintedHandOffManager.java RowMutation.java

Author: jbellis
Date: Tue May  5 17:03:15 2009
New Revision: 771934

URL: http://svn.apache.org/viewvc?rev=771934&view=rev
Log:
 A make hint generation include a real timestamp so we can do meaningful deletes
 B call removeDeleted on the data we read locally to purge tombstones
 C because of (B) any supercolumn w/o subcolumns simply won't exist so we know we can skip re-deleting the endpoint data. so deleteKey becomes deleteHintedData.
 D because deleted data is not immediately purged, increased the scheduled interval from 20min to 60 to reduce the load of scanning the hint CF.

patch by Jun Rao and jbellis for CASSANDRA-34

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=771934&r1=771933&r2=771934&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue May  5 17:03:15 2009
@@ -641,7 +641,7 @@
                 }
             }
             else if ((c.isMarkedForDelete() && c.getLocalDeletionTime() <= gcBefore)
-                     || c.timestamp() < cf.getMarkedForDeleteAt())
+                     || c.timestamp() <= cf.getMarkedForDeleteAt())
             {
                 cf.remove(cname);
             }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=771934&r1=771933&r2=771934&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Tue May  5 17:03:15 2009
@@ -19,6 +19,7 @@
 package org.apache.cassandra.db;
 
 import java.util.Collection;
+import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -53,7 +54,7 @@
     private static Lock lock_ = new ReentrantLock();
     private static Logger logger_ = Logger.getLogger(HintedHandOffManager.class);
     public static final String key_ = "HintedHandOffKey";
-    final static long intervalInMins_ = 20;
+    final static long intervalInMins_ = 60;
     private ScheduledExecutorService executor_ = new DebuggableScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("HINTED-HANDOFF-POOL"));
 
 
@@ -85,26 +86,61 @@
 
         Table table = Table.open(DatabaseDescriptor.getTables().get(0));
         Row row = table.get(key);
-        RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), row);
+        Row purgedRow = new Row(key);
+        for (ColumnFamily cf : row.getColumnFamilies())
+        {
+            purgedRow.addColumnFamily(ColumnFamilyStore.removeDeleted(cf));
+        }
+        RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), purgedRow);
         Message message = rm.makeRowMutationMessage();
-
         QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(1, new WriteResponseResolver());
         MessagingService.getMessagingInstance().sendRR(message, new EndPoint[]{ endPoint }, quorumResponseHandler);
+
         return quorumResponseHandler.get();
     }
 
-    private static void deleteEndPoint(String endpointAddress, String key) throws Exception
+    private static void deleteEndPoint(String endpointAddress, String key, long timestamp) throws Exception
     {
         RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
-        rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress, System.currentTimeMillis());
+        rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress, timestamp);
         rm.apply();
     }
 
-    private static void deleteKey(String key) throws Exception
+    private static void deleteHintedData(String key) throws Exception
     {
-        // delete the hint record
+        // delete the row from Application CFs: find the largest timestamp in any of
+        // the data columns, and delete the entire CF with that value for the tombstone.
+
+        // Note that we delete all data associated with the key: this may be more than
+        // we sent earlier in sendMessage, since HH is not serialized with writes.
+        // This is sub-optimal but okay, since HH is just an effort to make a recovering
+        // node more consistent than it would have been; we can rely on the other
+        // consistency mechanisms to finish the job in this corner case.
         RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
-        rm.delete(Table.hints_ + ":" + key, System.currentTimeMillis());
+        Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+        Row row = table.get(key); // not necessary to do removeDeleted here
+        Collection<ColumnFamily> cfs = row.getColumnFamilies();
+        for (ColumnFamily cf : cfs)
+        {
+            Set<IColumn> columns = cf.getAllColumns();
+            long maxTS = Long.MIN_VALUE;
+            if (!cf.isSuper())
+            {
+                for (IColumn col : columns)
+                    maxTS = Math.max(maxTS, col.timestamp());
+            }
+            else
+            {
+                for (IColumn col : columns)
+                {
+                    maxTS = Math.max(maxTS, col.timestamp());
+                    Collection<IColumn> subColumns = col.getSubColumns();
+                    for (IColumn subCol : subColumns)
+                        maxTS = Math.max(maxTS, subCol.timestamp());
+                }
+            }
+            rm.delete(cf.name(), maxTS);
+        }
         rm.apply();
     }
 
@@ -120,10 +156,9 @@
         // 6. Do major compaction to clean up all deletes etc.
         // 7. I guess we r done
         Table table = Table.open(DatabaseDescriptor.getTables().get(0));
-        ColumnFamily hintColumnFamily = null;
         try
         {
-            hintColumnFamily = table.get(key_, Table.hints_);
+            ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(table.get(key_, Table.hints_), Integer.MAX_VALUE);
             if (hintColumnFamily == null)
             {
                 columnFamilyStore.forceFlush();
@@ -138,26 +173,18 @@
             for (IColumn keyColumn : keys)
             {
                 Collection<IColumn> endpoints = keyColumn.getSubColumns();
-                // endpoints could be null if the server were terminated during a previous runHints
-                // after deleteEndPoint but before deleteKey.
-                boolean allsuccess = true;
-                if (endpoints != null)
+                int deleted = 0;
+                for (IColumn endpoint : endpoints)
                 {
-                    for (IColumn endpoint : endpoints)
+                    if (sendMessage(endpoint.name(), keyColumn.name()))
                     {
-                        if (sendMessage(endpoint.name(), keyColumn.name()))
-                        {
-                            deleteEndPoint(endpoint.name(), keyColumn.name());
-                        }
-                        else
-                        {
-                            allsuccess = false;
-                        }
+                        deleteEndPoint(endpoint.name(), keyColumn.name(), keyColumn.timestamp());
+                        deleted++;
                     }
                 }
-                if (allsuccess)
+                if (deleted == endpoints.size())
                 {
-                    deleteKey(keyColumn.name());
+                    deleteHintedData(keyColumn.name());
                 }
             }
             columnFamilyStore.forceFlush();
@@ -181,10 +208,9 @@
         // 2. For each key read the list of recepients if teh endpoint matches send
         // 3. Delete that recepient from the key if write was successful
         Table table = Table.open(DatabaseDescriptor.getTables().get(0));
-        ColumnFamily hintedColumnFamily = null;
         try
         {
-            hintedColumnFamily = table.get(key_, Table.hints_);
+            ColumnFamily hintedColumnFamily = table.get(key_, Table.hints_);
             if (hintedColumnFamily == null)
             {
                 return;
@@ -198,18 +224,14 @@
             for (IColumn keyColumn : keys)
             {
                 Collection<IColumn> endpoints = keyColumn.getSubColumns();
-                if (endpoints == null)
-                {
-                    deleteKey(keyColumn.name());
-                    continue;
-                }
                 for (IColumn endpoint : endpoints)
                 {
-                    if (endpoint.name().equals(endPoint.getHost()))
+                    if (endpoint.name().equals(endPoint.getHost()) && sendMessage(endpoint.name(), keyColumn.name()))
                     {
-                        if (sendMessage(endpoint.name(), keyColumn.name()))
+                        deleteEndPoint(endpoint.name(), keyColumn.name(), keyColumn.timestamp());
+                        if (endpoints.size() == 1)
                         {
-                            deleteEndPoint(endpoint.name(), keyColumn.name());
+                            deleteHintedData(keyColumn.name());
                         }
                     }
                 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=771934&r1=771933&r2=771934&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Tue May  5 17:03:15 2009
@@ -120,7 +120,7 @@
     void addHints(String hint) throws IOException
     {
         String cfName = Table.hints_ + ":" + hint;
-        add(cfName, ArrayUtils.EMPTY_BYTE_ARRAY, 0);
+        add(cfName, ArrayUtils.EMPTY_BYTE_ARRAY, System.currentTimeMillis());
     }
 
     /*