You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/05/01 23:24:37 UTC
[2/3] cassandra git commit: Fix anticompaction blocking ANTI_ENTROPY
stage
Fix anticompaction blocking ANTI_ENTROPY stage
patch by yukim; reviewed by sankalp kohli for CASSANDRA-9151
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6d06f32a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6d06f32a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6d06f32a
Branch: refs/heads/trunk
Commit: 6d06f32a7729524a195ad67ba82157e390dc3912
Parents: 9b6f55b
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri May 1 15:06:35 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri May 1 15:06:35 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 11 ++--
.../repair/RepairMessageVerbHandler.java | 15 +----
.../cassandra/service/ActiveRepairService.java | 61 ++++++++++++++------
.../cassandra/service/StorageService.java | 6 +-
5 files changed, 55 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d06f32a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 627cc6b..a6e4e41 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@
* Update tuple and collection types that use a user-defined type when that UDT
is modified (CASSANDRA-9148, CASSANDRA-9192)
* Use higher timeout for prepair and snapshot in repair (CASSANDRA-9261)
+ * Fix anticompaction blocking ANTI_ENTROPY stage (CASSANDRA-9151)
Merged from 2.0:
* Fix index selection during rebuild with certain table layouts (CASSANDRA-9281)
* Fix partition-level-delete-only workload accounting (CASSANDRA-9194)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d06f32a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 72deb21..7215945 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -50,8 +50,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -387,7 +386,7 @@ public class CompactionManager implements CompactionManagerMBean
});
}
- public Future<?> submitAntiCompaction(final ColumnFamilyStore cfs,
+ public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
final Collection<Range<Token>> ranges,
final Refs<SSTableReader> sstables,
final long repairedAt)
@@ -417,7 +416,9 @@ public class CompactionManager implements CompactionManagerMBean
return Futures.immediateCancelledFuture();
}
- return executor.submit(runnable);
+ ListenableFutureTask<?> task = ListenableFutureTask.create(runnable, null);
+ executor.submit(task);
+ return task;
}
/**
@@ -483,7 +484,7 @@ public class CompactionManager implements CompactionManagerMBean
cfs.getDataTracker().unmarkCompacting(sstables);
}
- logger.info(String.format("Completed anticompaction successfully"));
+ logger.info("Completed anticompaction successfully");
}
public void performMaximal(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d06f32a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index c7cf4c8..5b25afa 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -112,20 +112,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
case ANTICOMPACTION_REQUEST:
logger.debug("Got anticompaction request");
AnticompactionRequest anticompactionRequest = (AnticompactionRequest) message.payload;
- try
- {
- List<Future<?>> futures = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession);
- FBUtilities.waitOnFutures(futures);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- finally
- {
- ActiveRepairService.instance.removeParentRepairSession(anticompactionRequest.parentRepairSession);
- }
-
+ ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession);
break;
default:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d06f32a/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index ac5ed99..5cc26ed 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.service;
import java.io.File;
-import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
@@ -29,6 +28,9 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +38,6 @@ import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
@@ -57,7 +58,6 @@ import org.apache.cassandra.repair.messages.ValidationComplete;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.Ref;
-import org.apache.cassandra.utils.concurrent.RefCounted;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -320,25 +320,31 @@ public class ActiveRepairService
return repairing;
}
- public synchronized void finishParentSession(UUID parentSession, Set<InetAddress> neighbors, boolean doAntiCompaction) throws InterruptedException, ExecutionException, IOException
+ /**
+ * Run final process of repair.
+ * This removes all resources held by parent repair session, after performing anti compaction if necessary.
+ *
+ * @param parentSession Parent session ID
+ * @param neighbors Repair participants (not including self)
+ * @param doAntiCompaction true if repair session needs anti compaction
+ * @throws InterruptedException
+ * @throws ExecutionException
+ */
+ public synchronized void finishParentSession(UUID parentSession, Set<InetAddress> neighbors, boolean doAntiCompaction) throws InterruptedException, ExecutionException
{
- try
+ if (doAntiCompaction)
{
- if (doAntiCompaction)
+ for (InetAddress neighbor : neighbors)
{
- for (InetAddress neighbor : neighbors)
- {
- AnticompactionRequest acr = new AnticompactionRequest(parentSession);
- MessageOut<RepairMessage> req = acr.createMessage();
- MessagingService.instance().sendOneWay(req, neighbor);
- }
- List<Future<?>> futures = doAntiCompaction(parentSession);
- FBUtilities.waitOnFutures(futures);
+ AnticompactionRequest acr = new AnticompactionRequest(parentSession);
+ MessageOut<RepairMessage> req = acr.createMessage();
+ MessagingService.instance().sendOneWay(req, neighbor);
}
+ doAntiCompaction(parentSession).get();
}
- finally
+ else
{
- parentRepairSessions.remove(parentSession);
+ removeParentRepairSession(parentSession);
}
}
@@ -352,12 +358,19 @@ public class ActiveRepairService
return parentRepairSessions.remove(parentSessionId);
}
- public List<Future<?>> doAntiCompaction(UUID parentRepairSession) throws InterruptedException, ExecutionException, IOException
+ /**
+ * Submit anti-compaction jobs to CompactionManager.
+ * When all jobs are done, parent repair session is removed whether those are suceeded or not.
+ *
+ * @param parentRepairSession parent repair session ID
+ * @return Future result of all anti-compaction jobs.
+ */
+ public ListenableFuture<List<Object>> doAntiCompaction(final UUID parentRepairSession)
{
assert parentRepairSession != null;
ParentRepairSession prs = getParentRepairSession(parentRepairSession);
- List<Future<?>> futures = new ArrayList<>();
+ List<ListenableFuture<?>> futures = new ArrayList<>();
for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
{
Refs<SSTableReader> sstables = prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey());
@@ -365,7 +378,17 @@ public class ActiveRepairService
futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt));
}
- return futures;
+ ListenableFuture<List<Object>> allAntiCompactionResults = Futures.successfulAsList(futures);
+ allAntiCompactionResults.addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ removeParentRepairSession(parentRepairSession);
+ }
+ }, MoreExecutors.sameThreadExecutor());
+
+ return allAntiCompactionResults;
}
public void handleMessage(InetAddress endpoint, RepairMessage message)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d06f32a/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index d8fa831..8521256 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2732,7 +2732,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies) throws IOException
{
- return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(), dataCenters, hosts, fullRepair, columnFamilies);
+ return forceRepairRangeAsync(beginToken, endToken, keyspaceName,
+ isSequential ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(),
+ dataCenters, hosts, fullRepair, columnFamilies);
}
public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, int parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies)
@@ -2939,7 +2941,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
if (!fullRepair)
+ {
ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successful);
+ }
sendNotification("repair", String.format("Repair command #%d finished", cmd), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
}
}, null);