You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/11/24 22:26:24 UTC
[7/8] cassandra git commit: Merge branch 'cassandra-2.0' into
cassandra-2.1
Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
src/java/org/apache/cassandra/repair/RepairJob.java
src/java/org/apache/cassandra/repair/RepairSession.java
src/java/org/apache/cassandra/service/ActiveRepairService.java
src/java/org/apache/cassandra/service/StorageService.java
src/java/org/apache/cassandra/service/StorageServiceMBean.java
src/java/org/apache/cassandra/tools/NodeCmd.java
src/java/org/apache/cassandra/tools/NodeProbe.java
src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/326a9ff2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/326a9ff2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/326a9ff2
Branch: refs/heads/cassandra-2.1
Commit: 326a9ff2f831eeafedbc37b7a4b8f8f4a709e399
Parents: eac7781 41469ec
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 24 15:21:34 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 24 15:21:34 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../DatacenterAwareRequestCoordinator.java | 73 +++++++++++
.../cassandra/repair/IRequestCoordinator.java | 28 ++++
.../cassandra/repair/IRequestProcessor.java | 23 ++++
.../repair/ParallelRequestCoordinator.java | 49 +++++++
.../org/apache/cassandra/repair/RepairJob.java | 32 ++++-
.../cassandra/repair/RepairParallelism.java | 22 ++++
.../apache/cassandra/repair/RepairSession.java | 14 +-
.../cassandra/repair/RequestCoordinator.java | 128 -------------------
.../repair/SequentialRequestCoordinator.java | 58 +++++++++
.../cassandra/service/ActiveRepairService.java | 6 +-
.../cassandra/service/StorageService.java | 49 +++++--
.../cassandra/service/StorageServiceMBean.java | 20 ++-
.../org/apache/cassandra/tools/NodeProbe.java | 29 +++--
.../org/apache/cassandra/tools/NodeTool.java | 14 +-
.../repair/RequestCoordinatorTest.java | 124 ++++++++++++++++++
16 files changed, 499 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c9e35d5,7519653..fa3ce8a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -26,34 -12,7 +26,35 @@@ Merged from 2.0
* Avoid overlap in L1 when L0 contains many nonoverlapping
sstables (CASSANDRA-8211)
* Improve PropertyFileSnitch logging (CASSANDRA-8183)
- * Abort liveRatio calculation if the memtable is flushed (CASSANDRA-8164)
++ * Add DC-aware sequential repair (CASSANDRA-8193)
+
+
+2.1.2
+ * (cqlsh) parse_for_table_meta errors out on queries with undefined
+ grammars (CASSANDRA-8262)
+ * (cqlsh) Fix SELECT ... TOKEN() function broken in C* 2.1.1 (CASSANDRA-8258)
+ * Fix Cassandra crash when running on JDK8 update 40 (CASSANDRA-8209)
+ * Optimize partitioner tokens (CASSANDRA-8230)
+ * Improve compaction of repaired/unrepaired sstables (CASSANDRA-8004)
+ * Make cache serializers pluggable (CASSANDRA-8096)
+ * Fix issues with CONTAINS (KEY) queries on secondary indexes
+ (CASSANDRA-8147)
+ * Fix read-rate tracking of sstables for some queries (CASSANDRA-8239)
+ * Fix default timestamp in QueryOptions (CASSANDRA-8246)
+ * Set socket timeout when reading remote version (CASSANDRA-8188)
+ * Refactor how we track live size (CASSANDRA-7852)
+ * Make sure unfinished compaction files are removed (CASSANDRA-8124)
+ * Fix shutdown when run as Windows service (CASSANDRA-8136)
+ * Fix DESCRIBE TABLE with custom indexes (CASSANDRA-8031)
+ * Fix race in RecoveryManagerTest (CASSANDRA-8176)
+ * Avoid IllegalArgumentException while sorting sstables in
+ IndexSummaryManager (CASSANDRA-8182)
+ * Shutdown JVM on file descriptor exhaustion (CASSANDRA-7579)
+ * Add 'die' policy for commit log and disk failure (CASSANDRA-7927)
+ * Fix installing as service on Windows (CASSANDRA-8115)
+ * Fix CREATE TABLE for CQL2 (CASSANDRA-8144)
+ * Avoid boxing in ColumnStats min/max trackers (CASSANDRA-8109)
+Merged from 2.0:
* Correctly handle non-text column names in cql3 (CASSANDRA-8178)
* Fix deletion for indexes on primary key columns (CASSANDRA-8206)
* Add 'nodetool statusgossip' (CASSANDRA-8125)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairJob.java
index 8057ed5,7c791aa..20d5d97
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@@ -73,12 -72,14 +73,14 @@@ public class RepairJo
ListeningExecutorService taskExecutor)
{
this.listener = listener;
- this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range);
+ this.desc = new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, range);
- this.isSequential = isSequential;
+ this.parallelismDegree = parallelismDegree;
this.taskExecutor = taskExecutor;
- this.treeRequests = new RequestCoordinator<InetAddress>(isSequential)
+
+ IRequestProcessor<InetAddress> processor = new IRequestProcessor<InetAddress>()
{
- public void send(InetAddress endpoint)
+ @Override
+ public void process(InetAddress endpoint)
{
ValidationRequest request = new ValidationRequest(desc, gcBefore);
MessagingService.instance().sendOneWay(request.createMessage(), endpoint);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index 346f3f4,f2b95eb..0580ebb
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -114,20 -110,19 +114,20 @@@ public class RepairSession extends Wrap
*
* @param range range to repair
* @param keyspace name of keyspace
- * @param isSequential true if performing repair on snapshots sequentially
+ * @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees
- * @param dataCenters the data centers that should be part of the repair; null for all DCs
+ * @param endpoints the data centers that should be part of the repair; null for all DCs
* @param cfnames names of columnfamilies
*/
- public RepairSession(UUID parentRepairSession, Range<Token> range, String keyspace, boolean isSequential, Set<InetAddress> endpoints, String... cfnames)
- public RepairSession(Range<Token> range, String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, String... cfnames)
++ public RepairSession(UUID parentRepairSession, Range<Token> range, String keyspace, RepairParallelism parallelismDegree, Set<InetAddress> endpoints, String... cfnames)
{
- this(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, isSequential, endpoints, cfnames);
- this(UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, dataCenters, hosts, cfnames);
++ this(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, cfnames);
}
- public RepairSession(UUID parentRepairSession, UUID id, Range<Token> range, String keyspace, boolean isSequential, Set<InetAddress> endpoints, String[] cfnames)
- public RepairSession(UUID id, Range<Token> range, String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, String[] cfnames)
++ public RepairSession(UUID parentRepairSession, UUID id, Range<Token> range, String keyspace, RepairParallelism parallelismDegree, Set<InetAddress> endpoints, String[] cfnames)
{
+ this.parentRepairSession = parentRepairSession;
this.id = id;
- this.isSequential = isSequential;
+ this.parallelismDegree = parallelismDegree;
this.keyspace = keyspace;
this.cfnames = cfnames;
assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
@@@ -284,10 -270,10 +284,10 @@@
// Create and queue a RepairJob for each column family
for (String cfname : cfnames)
{
- RepairJob job = new RepairJob(this, parentRepairSession, id, keyspace, cfname, range, isSequential, taskExecutor);
- RepairJob job = new RepairJob(this, id, keyspace, cfname, range, parallelismDegree, taskExecutor);
++ RepairJob job = new RepairJob(this, parentRepairSession, id, keyspace, cfname, range, parallelismDegree, taskExecutor);
jobs.offer(job);
}
-
+ logger.debug("Sending tree requests to endpoints {}", endpoints);
jobs.peek().sendTreeRequests(endpoints);
// block whatever thread started this session until all requests have been returned:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 68c2fae,da81e8f..d43143e
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -116,9 -92,9 +116,9 @@@ public class ActiveRepairServic
*
* @return Future for asynchronous call or null if there is no need to repair
*/
- public RepairFuture submitRepairSession(UUID parentRepairSession, Range<Token> range, String keyspace, boolean isSequential, Set<InetAddress> endpoints, String... cfnames)
- public RepairFuture submitRepairSession(Range<Token> range, String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, String... cfnames)
++ public RepairFuture submitRepairSession(UUID parentRepairSession, Range<Token> range, String keyspace, RepairParallelism parallelismDegree, Set<InetAddress> endpoints, String... cfnames)
{
- RepairSession session = new RepairSession(parentRepairSession, range, keyspace, isSequential, endpoints, cfnames);
- RepairSession session = new RepairSession(range, keyspace, parallelismDegree, dataCenters, hosts, cfnames);
++ RepairSession session = new RepairSession(parentRepairSession, range, keyspace, parallelismDegree, endpoints, cfnames);
if (session.endpoints.isEmpty())
return null;
RepairFuture futureTask = new RepairFuture(session);
@@@ -152,9 -128,7 +152,9 @@@
// add it to the sessions (avoid NPE in tests)
RepairFuture submitArtificialRepairSession(RepairJobDesc desc)
{
- RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace, RepairParallelism.PARALLEL, null, null, new String[]{desc.columnFamily});
+ Set<InetAddress> neighbours = new HashSet<>();
+ neighbours.addAll(ActiveRepairService.getNeighbors(desc.keyspace, desc.range, null, null));
- RepairSession session = new RepairSession(desc.parentSessionId, desc.sessionId, desc.range, desc.keyspace, false, neighbours, new String[]{desc.columnFamily});
++ RepairSession session = new RepairSession(desc.parentSessionId, desc.sessionId, desc.range, desc.keyspace, RepairParallelism.PARALLEL, neighbours, new String[]{desc.columnFamily});
sessions.put(session.getId(), session);
RepairFuture futureTask = new RepairFuture(session);
executor.execute(futureTask);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 79cea8e,3d42d1c..38cca10
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2489,86 -2407,112 +2490,106 @@@ public class StorageService extends Not
sendNotification(jmxNotification);
}
- public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<String> hosts, final boolean primaryRange, final String... columnFamilies)
+ public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException
{
- return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, primaryRange, columnFamilies);
++ return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, primaryRange, fullRepair, columnFamilies);
+ }
+
- public int forceRepairAsync(final String keyspace, final RepairParallelism parallelismDegree, final Collection<String> dataCenters, final Collection<String> hosts, final boolean primaryRange, final String... columnFamilies)
++ public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies)
+ {
- // when repairing only primary range, dataCenter nor hosts can be set
- if (primaryRange && (dataCenters != null || hosts != null))
+ Collection<Range<Token>> ranges;
+ if (primaryRange)
{
- throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+ // when repairing only primary range, neither dataCenters nor hosts can be set
+ if (dataCenters == null && hosts == null)
+ ranges = getPrimaryRanges(keyspace);
+ // except dataCenters only contain local DC (i.e. -local)
+ else if (dataCenters != null && dataCenters.size() == 1 && dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
+ ranges = getPrimaryRangesWithinDC(keyspace);
+ else
+ throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
}
- final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
- return forceRepairAsync(keyspace, parallelismDegree, dataCenters, hosts, ranges, columnFamilies);
+ else
+ {
+ ranges = getLocalRanges(keyspace);
+ }
+
- return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, ranges, fullRepair, columnFamilies);
++ return forceRepairAsync(keyspace, parallelismDegree, dataCenters, hosts, ranges, fullRepair, columnFamilies);
}
- public int forceRepairAsync(final String keyspace, final RepairParallelism parallelismDegree, final Collection<String> dataCenters, final Collection<String> hosts, final Collection<Range<Token>> ranges, final String... columnFamilies)
+ public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies)
+ {
++ return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, ranges, fullRepair, columnFamilies);
++ }
++
++ public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies)
+ {
if (ranges.isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2)
return 0;
- final int cmd = nextRepairCommand.incrementAndGet();
+ int cmd = nextRepairCommand.incrementAndGet();
if (ranges.size() > 0)
{
- if (!FBUtilities.isUnix() && isSequential)
- new Thread(createRepairTask(cmd, keyspace, ranges, parallelismDegree, dataCenters, hosts, columnFamilies)).start();
++ if (!FBUtilities.isUnix() && parallelismDegree != RepairParallelism.PARALLEL)
+ {
+ logger.warn("Snapshot-based repair is not yet supported on Windows. Reverting to parallel repair.");
- isSequential = false;
++ parallelismDegree = RepairParallelism.PARALLEL;
+ }
- new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, hosts, fullRepair, columnFamilies)).start();
++ new Thread(createRepairTask(cmd, keyspace, ranges, parallelismDegree, dataCenters, hosts, fullRepair, columnFamilies)).start();
}
return cmd;
}
- public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final String... columnFamilies)
+ public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies)
{
- // when repairing only primary range, you cannot repair only on local DC
- if (primaryRange && isLocal)
+ Collection<Range<Token>> ranges;
+ if (primaryRange)
{
- throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+ ranges = isLocal ? getPrimaryRangesWithinDC(keyspace) : getPrimaryRanges(keyspace);
}
- final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
- return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, isLocal, ranges, columnFamilies);
- }
-
- public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, boolean isLocal, Collection<Range<Token>> ranges, String... columnFamilies)
- {
- if (ranges.isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2)
- return 0;
-
- final int cmd = nextRepairCommand.incrementAndGet();
- if (!FBUtilities.isUnix() && parallelismDegree != RepairParallelism.PARALLEL)
+ else
{
- logger.warn("Snapshot-based repair is not yet supported on Windows. Reverting to parallel repair.");
- parallelismDegree = RepairParallelism.PARALLEL;
+ ranges = getLocalRanges(keyspace);
}
- new Thread(createRepairTask(cmd, keyspace, ranges, parallelismDegree, isLocal, columnFamilies)).start();
- return cmd;
+
+ return forceRepairAsync(keyspace, isSequential, isLocal, ranges, fullRepair, columnFamilies);
}
- public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies)
+ public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies)
{
- return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, columnFamilies);
++ return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, isLocal, ranges, fullRepair, columnFamilies);
+ }
+
- public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies)
++ public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, boolean isLocal, Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies)
+ {
- Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
-
- logger.info("starting user-requested repair of range {} for keyspace {} and column families {}",
- repairingRange, keyspaceName, columnFamilies);
+ if (ranges.isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2)
+ return 0;
+ int cmd = nextRepairCommand.incrementAndGet();
- if (!FBUtilities.isUnix() && isSequential)
+ if (!FBUtilities.isUnix() && parallelismDegree != RepairParallelism.PARALLEL)
{
logger.warn("Snapshot-based repair is not yet supported on Windows. Reverting to parallel repair.");
- isSequential = false;
+ parallelismDegree = RepairParallelism.PARALLEL;
}
- new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, isLocal, fullRepair, columnFamilies)).start();
- return forceRepairAsync(keyspaceName, parallelismDegree, dataCenters, hosts, repairingRange, columnFamilies);
- }
-
- public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies)
- {
- Set<String> dataCenters = null;
- if (isLocal)
- {
- dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
- }
- return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential, dataCenters, null, columnFamilies);
++ new Thread(createRepairTask(cmd, keyspace, ranges, parallelismDegree, isLocal, fullRepair, columnFamilies)).start();
+ return cmd;
}
- /**
- * Trigger proactive repair for a keyspace and column families.
- */
- public void forceKeyspaceRepair(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
+ public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies) throws IOException
{
- forceKeyspaceRepairRange(keyspaceName, getLocalRanges(keyspaceName), isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, isLocal, columnFamilies);
++ return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, fullRepair, columnFamilies);
+ }
+
- public void forceKeyspaceRepairPrimaryRange(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
++ public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies)
+ {
- // primary range repair can only be performed for whole cluster.
- // NOTE: we should omit the param but keep API as is for now.
- if (isLocal)
- {
- throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
- }
+ Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
- forceKeyspaceRepairRange(keyspaceName, getLocalPrimaryRanges(keyspaceName), isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, false, columnFamilies);
+ logger.info("starting user-requested repair of range {} for keyspace {} and column families {}",
+ repairingRange, keyspaceName, columnFamilies);
- return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, repairingRange, fullRepair, columnFamilies);
++ return forceRepairAsync(keyspaceName, parallelismDegree, dataCenters, hosts, repairingRange, fullRepair, columnFamilies);
}
- public void forceKeyspaceRepairRange(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
+ public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... columnFamilies)
{
Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
@@@ -2616,30 -2567,17 +2637,30 @@@
return repairingRange;
}
- private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final RepairParallelism parallelismDegree, final boolean isLocal, final String... columnFamilies)
+ private FutureTask<Object> createRepairTask(int cmd,
+ String keyspace,
+ Collection<Range<Token>> ranges,
- boolean isSequential,
++ RepairParallelism parallelismDegree,
+ boolean isLocal,
+ boolean fullRepair,
+ String... columnFamilies)
{
Set<String> dataCenters = null;
if (isLocal)
{
dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
}
- return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, null, fullRepair, columnFamilies);
- return createRepairTask(cmd, keyspace, ranges, parallelismDegree, dataCenters, null, columnFamilies);
++ return createRepairTask(cmd, keyspace, ranges, parallelismDegree, dataCenters, null, fullRepair, columnFamilies);
}
- private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final RepairParallelism parallelismDegree, final Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies)
+ private FutureTask<Object> createRepairTask(final int cmd,
+ final String keyspace,
+ final Collection<Range<Token>> ranges,
- final boolean isSequential,
++ final RepairParallelism parallelismDegree,
+ final Collection<String> dataCenters,
+ final Collection<String> hosts,
+ final boolean fullRepair,
+ final String... columnFamilies)
{
if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
{
@@@ -2650,71 -2588,24 +2671,71 @@@
{
protected void runMayThrow() throws Exception
{
- String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s (seq=%b, full=%b)", cmd, ranges.size(), keyspace, isSequential, fullRepair);
- String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s", cmd, ranges.size(), keyspace);
++ String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s (parallelism=%s, full=%b)", cmd, ranges.size(), keyspace, parallelismDegree, fullRepair);
logger.info(message);
sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()});
- if (isSequential && !fullRepair)
- List<RepairFuture> futures = new ArrayList<>(ranges.size());
++ if (parallelismDegree != RepairParallelism.PARALLEL && !fullRepair)
+ {
+ message = "It is not possible to mix sequential repair and incremental repairs.";
+ logger.error(message);
+ sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
+ return;
+ }
+
+ Set<InetAddress> allNeighbors = new HashSet<>();
+ Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
for (Range<Token> range : ranges)
{
- RepairFuture future;
try
{
- future = forceKeyspaceRepair(range, keyspace, parallelismDegree, dataCenters, hosts, columnFamilies);
+ Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range, dataCenters, hosts);
+ rangeToNeighbors.put(range, neighbors);
+ allNeighbors.addAll(neighbors);
}
catch (IllegalArgumentException e)
{
- logger.error("Repair session failed:", e);
- sendNotification("repair", e.getMessage(), new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()});
- continue;
+ logger.error("Repair failed:", e);
+ sendNotification("repair", e.getMessage(), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
+ return;
}
+ }
+
+ // Validate columnfamilies
+ List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>();
+ try
+ {
+ Iterables.addAll(columnFamilyStores, getValidColumnFamilies(false, false, keyspace, columnFamilies));
+ }
+ catch (IllegalArgumentException e)
+ {
+ sendNotification("repair", e.getMessage(), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
+ return;
+ }
+
+ UUID parentSession = null;
+ if (!fullRepair)
+ {
+ try
+ {
+ parentSession = ActiveRepairService.instance.prepareForRepair(allNeighbors, ranges, columnFamilyStores);
+ }
+ catch (Throwable t)
+ {
+ sendNotification("repair", String.format("Repair failed with error %s", t.getMessage()), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
+ return;
+ }
+ }
+
+ List<RepairFuture> futures = new ArrayList<>(ranges.size());
+ String[] cfnames = new String[columnFamilyStores.size()];
+ for (int i = 0; i < columnFamilyStores.size(); i++)
+ {
+ cfnames[i] = columnFamilyStores.get(i).name;
+ }
+ for (Range<Token> range : ranges)
+ {
- RepairFuture future = ActiveRepairService.instance.submitRepairSession(parentSession, range, keyspace, isSequential, rangeToNeighbors.get(range), cfnames);
++ RepairFuture future = ActiveRepairService.instance.submitRepairSession(parentSession, range, keyspace, parallelismDegree, rangeToNeighbors.get(range), cfnames);
if (future == null)
continue;
futures.add(future);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 8ae44ff,2386fc8..e7d6f14
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@@ -272,14 -259,30 +274,30 @@@ public interface StorageServiceMBean ex
*
* @return Repair command number, or 0 if nothing to repair
*/
- public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, boolean primaryRange, String... columnFamilies);
+ public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean repairedAt, String... columnFamilies) throws IOException;
/**
+ * Invoke repair asynchronously.
+ * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean.
+ * Notification format is:
+ * type: "repair"
+ * userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status
+ *
+ * @return Repair command number, or 0 if nothing to repair
+ */
- public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, boolean primaryRange, String... columnFamilies);
++ public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies);
+
+ /**
* Same as forceRepairAsync, but handles a specified range
*/
- public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies);
+ public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean repairedAt, String... columnFamilies) throws IOException;
/**
+ * Same as forceRepairAsync, but handles a specified range
+ */
- public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies);
++ public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies);
+
+ /**
* Invoke repair asynchronously.
* You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean.
* Notification format is:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java
index d495786,261d416..1d05887
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@@ -244,14 -211,24 +245,19 @@@ public class NodeProbe implements AutoC
ssProxy.forceKeyspaceFlush(keyspaceName, columnFamilies);
}
- public void forceKeyspaceRepair(String keyspaceName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
- {
- ssProxy.forceKeyspaceRepair(keyspaceName, isSequential, isLocal, columnFamilies);
- }
-
- public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, boolean primaryRange, String... columnFamilies) throws IOException
+ public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException
{
- forceRepairAsync(out, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, primaryRange, columnFamilies);
++ forceRepairAsync(out, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, primaryRange, fullRepair, columnFamilies);
+ }
+
- public void forceRepairAsync(final PrintStream out, final String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, boolean primaryRange, String... columnFamilies) throws IOException
++ public void forceRepairAsync(final PrintStream out, final String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException
+ {
RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies);
try
{
jmxc.addConnectionNotificationListener(runner, null, null);
ssProxy.addNotificationListener(runner, null, null);
- if (!runner.repairAndWait(ssProxy, isSequential, dataCenters, hosts, primaryRange, fullRepair))
- if (!runner.repairAndWait(ssProxy, parallelismDegree, dataCenters, hosts, primaryRange))
++ if (!runner.repairAndWait(ssProxy, parallelismDegree, dataCenters, hosts, primaryRange, fullRepair))
failed = true;
}
catch (Exception e)
@@@ -265,22 -242,22 +271,27 @@@
ssProxy.removeNotificationListener(runner);
jmxc.removeConnectionNotificationListener(runner);
}
- catch (Throwable ignored) {}
+ catch (Throwable t)
+ {
+ JVMStabilityInspector.inspectThrowable(t);
+ out.println("Exception occurred during clean-up. " + t);
+ }
}
}
- public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, final String startToken, final String endToken, String... columnFamilies) throws IOException
+
+ public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, final String startToken, final String endToken, boolean fullRepair, String... columnFamilies) throws IOException
{
- forceRepairRangeAsync(out, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, startToken, endToken, columnFamilies);
++ forceRepairRangeAsync(out, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, startToken, endToken, fullRepair, columnFamilies);
+ }
+
- public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, final String startToken, final String endToken, String... columnFamilies) throws IOException
++ public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, final String startToken, final String endToken, boolean fullRepair, String... columnFamilies) throws IOException
+ {
RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies);
try
{
jmxc.addConnectionNotificationListener(runner, null, null);
ssProxy.addNotificationListener(runner, null, null);
- if (!runner.repairRangeAndWait(ssProxy, isSequential, dataCenters, hosts, startToken, endToken, fullRepair))
- if (!runner.repairRangeAndWait(ssProxy, parallelismDegree, dataCenters, hosts, startToken, endToken))
++ if (!runner.repairRangeAndWait(ssProxy, parallelismDegree, dataCenters, hosts, startToken, endToken, fullRepair))
failed = true;
}
catch (Exception e)
@@@ -1287,16 -1070,16 +1298,16 @@@ class RepairRunner implements Notificat
this.columnFamilies = columnFamilies;
}
- public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRangeOnly, boolean fullRepair) throws Exception
- public boolean repairAndWait(StorageServiceMBean ssProxy, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, boolean primaryRangeOnly) throws Exception
++ public boolean repairAndWait(StorageServiceMBean ssProxy, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRangeOnly, boolean fullRepair) throws Exception
{
- cmd = ssProxy.forceRepairAsync(keyspace, isSequential, dataCenters, hosts, primaryRangeOnly, fullRepair, columnFamilies);
- cmd = ssProxy.forceRepairAsync(keyspace, parallelismDegree, dataCenters, hosts, primaryRangeOnly, columnFamilies);
++ cmd = ssProxy.forceRepairAsync(keyspace, parallelismDegree, dataCenters, hosts, primaryRangeOnly, fullRepair, columnFamilies);
waitForRepair();
return success;
}
- public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, String startToken, String endToken, boolean fullRepair) throws Exception
- public boolean repairRangeAndWait(StorageServiceMBean ssProxy, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, String startToken, String endToken) throws Exception
++ public boolean repairRangeAndWait(StorageServiceMBean ssProxy, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, String startToken, String endToken, boolean fullRepair) throws Exception
{
- cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, dataCenters, hosts, fullRepair, columnFamilies);
- cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, parallelismDegree, dataCenters, hosts, columnFamilies);
++ cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, parallelismDegree, dataCenters, hosts, fullRepair, columnFamilies);
waitForRepair();
return success;
}