You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/05/01 23:24:37 UTC

[2/3] cassandra git commit: Fix anticompaction blocking ANTI_ENTROPY stage

Fix anticompaction blocking ANTI_ENTROPY stage

patch by yukim; reviewed by sankalp kohli for CASSANDRA-9151


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6d06f32a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6d06f32a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6d06f32a

Branch: refs/heads/trunk
Commit: 6d06f32a7729524a195ad67ba82157e390dc3912
Parents: 9b6f55b
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri May 1 15:06:35 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri May 1 15:06:35 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 11 ++--
 .../repair/RepairMessageVerbHandler.java        | 15 +----
 .../cassandra/service/ActiveRepairService.java  | 61 ++++++++++++++------
 .../cassandra/service/StorageService.java       |  6 +-
 5 files changed, 55 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d06f32a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 627cc6b..a6e4e41 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@
  * Update tuple and collection types that use a user-defined type when that UDT
    is modified (CASSANDRA-9148, CASSANDRA-9192)
  * Use higher timeout for prepair and snapshot in repair (CASSANDRA-9261)
+ * Fix anticompaction blocking ANTI_ENTROPY stage (CASSANDRA-9151)
 Merged from 2.0:
  * Fix index selection during rebuild with certain table layouts (CASSANDRA-9281)
  * Fix partition-level-delete-only workload accounting (CASSANDRA-9194)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d06f32a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 72deb21..7215945 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -50,8 +50,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -387,7 +386,7 @@ public class CompactionManager implements CompactionManagerMBean
         });
     }
 
-    public Future<?> submitAntiCompaction(final ColumnFamilyStore cfs,
+    public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
                                           final Collection<Range<Token>> ranges,
                                           final Refs<SSTableReader> sstables,
                                           final long repairedAt)
@@ -417,7 +416,9 @@ public class CompactionManager implements CompactionManagerMBean
             return Futures.immediateCancelledFuture();
         }
 
-        return executor.submit(runnable);
+        ListenableFutureTask<?> task = ListenableFutureTask.create(runnable, null);
+        executor.submit(task);
+        return task;
     }
 
     /**
@@ -483,7 +484,7 @@ public class CompactionManager implements CompactionManagerMBean
             cfs.getDataTracker().unmarkCompacting(sstables);
         }
 
-        logger.info(String.format("Completed anticompaction successfully"));
+        logger.info("Completed anticompaction successfully");
     }
 
     public void performMaximal(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d06f32a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index c7cf4c8..5b25afa 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -112,20 +112,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                 case ANTICOMPACTION_REQUEST:
                     logger.debug("Got anticompaction request");
                     AnticompactionRequest anticompactionRequest = (AnticompactionRequest) message.payload;
-                    try
-                    {
-                        List<Future<?>> futures = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession);
-                        FBUtilities.waitOnFutures(futures);
-                    }
-                    catch (Exception e)
-                    {
-                        throw new RuntimeException(e);
-                    }
-                    finally
-                    {
-                        ActiveRepairService.instance.removeParentRepairSession(anticompactionRequest.parentRepairSession);
-                    }
-
+                    ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession);
                     break;
 
                 default:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d06f32a/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index ac5ed99..5cc26ed 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.service;
 
 import java.io.File;
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
@@ -29,6 +28,9 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +38,6 @@ import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
@@ -57,7 +58,6 @@ import org.apache.cassandra.repair.messages.ValidationComplete;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.Ref;
-import org.apache.cassandra.utils.concurrent.RefCounted;
 
 import org.apache.cassandra.utils.concurrent.Refs;
 
@@ -320,25 +320,31 @@ public class ActiveRepairService
         return repairing;
     }
 
-    public synchronized void finishParentSession(UUID parentSession, Set<InetAddress> neighbors, boolean doAntiCompaction) throws InterruptedException, ExecutionException, IOException
+    /**
+     * Run final process of repair.
+     * This removes all resources held by parent repair session, after performing anti compaction if necessary.
+     *
+     * @param parentSession Parent session ID
+     * @param neighbors Repair participants (not including self)
+     * @param doAntiCompaction true if repair session needs anti compaction
+     * @throws InterruptedException
+     * @throws ExecutionException
+     */
+    public synchronized void finishParentSession(UUID parentSession, Set<InetAddress> neighbors, boolean doAntiCompaction) throws InterruptedException, ExecutionException
     {
-        try
+        if (doAntiCompaction)
         {
-            if (doAntiCompaction)
+            for (InetAddress neighbor : neighbors)
             {
-                for (InetAddress neighbor : neighbors)
-                {
-                    AnticompactionRequest acr = new AnticompactionRequest(parentSession);
-                    MessageOut<RepairMessage> req = acr.createMessage();
-                    MessagingService.instance().sendOneWay(req, neighbor);
-                }
-                List<Future<?>> futures = doAntiCompaction(parentSession);
-                FBUtilities.waitOnFutures(futures);
+                AnticompactionRequest acr = new AnticompactionRequest(parentSession);
+                MessageOut<RepairMessage> req = acr.createMessage();
+                MessagingService.instance().sendOneWay(req, neighbor);
             }
+            doAntiCompaction(parentSession).get();
         }
-        finally
+        else
         {
-            parentRepairSessions.remove(parentSession);
+            removeParentRepairSession(parentSession);
         }
     }
 
@@ -352,12 +358,19 @@ public class ActiveRepairService
         return parentRepairSessions.remove(parentSessionId);
     }
 
-    public List<Future<?>> doAntiCompaction(UUID parentRepairSession) throws InterruptedException, ExecutionException, IOException
+    /**
+     * Submit anti-compaction jobs to CompactionManager.
+     * When all jobs are done, parent repair session is removed whether those are suceeded or not.
+     *
+     * @param parentRepairSession parent repair session ID
+     * @return Future result of all anti-compaction jobs.
+     */
+    public ListenableFuture<List<Object>> doAntiCompaction(final UUID parentRepairSession)
     {
         assert parentRepairSession != null;
         ParentRepairSession prs = getParentRepairSession(parentRepairSession);
 
-        List<Future<?>> futures = new ArrayList<>();
+        List<ListenableFuture<?>> futures = new ArrayList<>();
         for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
         {
             Refs<SSTableReader> sstables = prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey());
@@ -365,7 +378,17 @@ public class ActiveRepairService
             futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt));
         }
 
-        return futures;
+        ListenableFuture<List<Object>> allAntiCompactionResults = Futures.successfulAsList(futures);
+        allAntiCompactionResults.addListener(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                removeParentRepairSession(parentRepairSession);
+            }
+        }, MoreExecutors.sameThreadExecutor());
+
+        return allAntiCompactionResults;
     }
 
     public void handleMessage(InetAddress endpoint, RepairMessage message)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d06f32a/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index d8fa831..8521256 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2732,7 +2732,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies) throws IOException
     {
-        return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(), dataCenters, hosts, fullRepair, columnFamilies);
+        return forceRepairRangeAsync(beginToken, endToken, keyspaceName,
+                                     isSequential ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(),
+                                     dataCenters, hosts, fullRepair, columnFamilies);
     }
 
     public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, int parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies)
@@ -2939,7 +2941,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                     }
                 }
                 if (!fullRepair)
+                {
                     ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successful);
+                }
                 sendNotification("repair", String.format("Repair command #%d finished", cmd), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
             }
         }, null);