You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/09/08 14:45:52 UTC

[4/7] cassandra git commit: Cleanup, scrub and upgrade may unmark compacting early (CASSANDRA-10274)

Cleanup, scrub and upgrade may unmark compacting early (CASSANDRA-10274)

If an error occured during cleanup, scrub or upgrade
(or any parallelAllSSTableOperation), the caller was immediately
notified of the problem, and the method exited, executing the
finally block that unmarked all of the sstables as compacting.

Since the operations happen in parallel, many may still be running
or waiting to run, and so another operation may operate over the
same sstables, breaking the required mutual exclusivity.

This patch ensures the method is not exited until all operations
have completed, at which point the caller is notified of any
exceptions.

patch by benedict; reviewed by marcus for CASSANDRA-10274


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d769fcb3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d769fcb3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d769fcb3

Branch: refs/heads/cassandra-3.0
Commit: d769fcb397b6c5937561194b9e8f9dd596ffcd18
Parents: 0c0f1ff
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Sep 7 12:23:57 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Tue Sep 8 13:39:56 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  6 +++
 .../db/compaction/CompactionManager.java        | 51 +++++---------------
 .../org/apache/cassandra/utils/FBUtilities.java | 19 ++++++--
 .../org/apache/cassandra/utils/Throwables.java  | 16 ++++++
 4 files changed, 51 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d769fcb3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5dffb9b..fdba8ed 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,4 @@
+<<<<<<< HEAD
 2.2.2
  * Handle missing RoleManager in config after upgrade to 2.2 (CASSANDRA-10209) 
  * Retry snapshot deletion after compaction and gc on Windows (CASSANDRA-10222)
@@ -6,6 +7,11 @@
  * Fall back to 1/4 commitlog volume for commitlog_total_space on small disks
    (CASSANDRA-10199)
 Merged from 2.1:
+=======
+2.1.10
+ * Scrub, Cleanup and Upgrade do not unmark compacting until all operations
+   have completed, regardless of the occurence of exceptions (CASSANDRA-10274)
+>>>>>>> 04e789b... Cleanup, scrub and upgrade may unmark compacting early (CASSANDRA-10274)
  * Fix handling of streaming EOF (CASSANDRA-10206)
  * Only check KeyCache when it is enabled
  * Change streaming_socket_timeout_in_ms default to 1 hour (CASSANDRA-8611)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d769fcb3/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 5e1b31c..495c5ab 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -41,8 +41,6 @@ import javax.management.ObjectName;
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
-import com.google.common.base.Predicate;
-import com.google.common.base.Throwables;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
@@ -71,13 +69,7 @@ import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.repair.Validator;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.CloseableIterator;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.MerkleTree;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.WrappedRunnable;
-import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.Refs;
 
@@ -246,6 +238,7 @@ public class CompactionManager implements CompactionManagerMBean
     @SuppressWarnings("resource")
     private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, OperationType operationType) throws ExecutionException, InterruptedException
     {
+        List<LifecycleTransaction> transactions = new ArrayList<>();
         try (LifecycleTransaction compacting = cfs.markAllCompacting(operationType);)
         {
             Iterable<SSTableReader> sstables = Lists.newArrayList(operation.filterSSTables(compacting));
@@ -255,7 +248,7 @@ public class CompactionManager implements CompactionManagerMBean
                 return AllSSTableOpStatus.SUCCESSFUL;
             }
 
-            List<Pair<LifecycleTransaction,Future<Object>>> futures = new ArrayList<>();
+            List<Future<Object>> futures = new ArrayList<>();
 
             for (final SSTableReader sstable : sstables)
             {
@@ -266,7 +259,8 @@ public class CompactionManager implements CompactionManagerMBean
                 }
 
                 final LifecycleTransaction txn = compacting.split(singleton(sstable));
-                futures.add(Pair.create(txn,executor.submit(new Callable<Object>()
+                transactions.add(txn);
+                futures.add(executor.submit(new Callable<Object>()
                 {
                     @Override
                     public Object call() throws Exception
@@ -274,39 +268,20 @@ public class CompactionManager implements CompactionManagerMBean
                         operation.execute(txn);
                         return this;
                     }
-                })));
+                }));
             }
 
             assert compacting.originals().isEmpty();
 
-
-            //Collect all exceptions
-            Exception exception = null;
-
-            for (Pair<LifecycleTransaction, Future<Object>> f : futures)
-            {
-                try
-                {
-                    f.right.get();
-                }
-                catch (InterruptedException | ExecutionException e)
-                {
-                    if (exception == null)
-                        exception = new Exception();
-
-                    exception.addSuppressed(e);
-                }
-                finally
-                {
-                    f.left.close();
-                }
-            }
-
-            if (exception != null)
-                Throwables.propagate(exception);
-
+            FBUtilities.waitOnFutures(futures);
             return AllSSTableOpStatus.SUCCESSFUL;
         }
+        finally
+        {
+            Throwable fail = Throwables.close(null, transactions);
+            if (fail != null)
+                logger.error("Failed to cleanup lifecycle transactions {}", fail);
+        }
     }
 
     private static interface OneSSTableOperation

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d769fcb3/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index ce118b9..b41bdab 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -334,10 +334,23 @@ public class FBUtilities
         return System.currentTimeMillis() * 1000;
     }
 
-    public static void waitOnFutures(Iterable<Future<?>> futures)
+    public static <T> List<T> waitOnFutures(Iterable<? extends Future<? extends T>> futures)
     {
-        for (Future f : futures)
-            waitOnFuture(f);
+        List<T> results = new ArrayList<>();
+        Throwable fail = null;
+        for (Future<? extends T> f : futures)
+        {
+            try
+            {
+                results.add(f.get());
+            }
+            catch (InterruptedException | ExecutionException e)
+            {
+                fail = Throwables.merge(fail, e);
+            }
+        }
+        Throwables.maybeFail(fail);
+        return results;
     }
 
     public static <T> T waitOnFuture(Future<T> future)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d769fcb3/src/java/org/apache/cassandra/utils/Throwables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java
index 0a2bd28..a895f31 100644
--- a/src/java/org/apache/cassandra/utils/Throwables.java
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -34,4 +34,20 @@ public class Throwables
         if (fail != null)
             com.google.common.base.Throwables.propagate(fail);
     }
+
+    public static Throwable close(Throwable accumulate, Iterable<? extends AutoCloseable> closeables)
+    {
+        for (AutoCloseable closeable : closeables)
+        {
+            try
+            {
+                closeable.close();
+            }
+            catch (Throwable t)
+            {
+                accumulate = merge(accumulate, t);
+            }
+        }
+        return accumulate;
+    }
 }