You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/11/17 10:19:00 UTC
[1/4] cassandra git commit: Reject incremental repair requests
combined with subrange repair
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 c0480d8bb -> eb8afb71f
Reject incremental repair requests combined with subrange repair
Patch by Ariel Weisberg; reviewed by marcuse for CASSANDRA-10422
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a8e8a673
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a8e8a673
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a8e8a673
Branch: refs/heads/cassandra-3.0
Commit: a8e8a67306c0b26b8fe9c74a1fb00bacfa224cf7
Parents: 7e056fa
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Thu Oct 29 12:36:32 2015 -0400
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Nov 17 09:57:15 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/service/StorageService.java | 3 +++
.../org/apache/cassandra/service/StorageServiceServerTest.java | 6 ++++++
3 files changed, 10 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a8e8a673/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2eeda94..b6b394a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Reject incremental repair with subrange repair (CASSANDRA-10422)
* Add a nodetool command to refresh size_estimates (CASSANDRA-9579)
* Shutdown compaction in drain to prevent leak (CASSANDRA-10079)
* Invalidate cache after stream receive task is completed (CASSANDRA-10341)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a8e8a673/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 665ce3a..03c1960 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2830,6 +2830,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... columnFamilies)
{
+ if (!fullRepair)
+ throw new IllegalArgumentException("Incremental repair can't be requested with subrange repair because " +
+ "each subrange repair would generate an anti-compacted table");
Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
logger.info("starting user-requested repair of range {} for keyspace {} and column families {}",
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a8e8a673/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
index dd25b35..564239b 100644
--- a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
+++ b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
@@ -508,4 +508,10 @@ public class StorageServiceServerTest
repairRangeFrom = StorageService.instance.createRepairRangeFrom("2000", "2000");
assert repairRangeFrom.size() == 0;
}
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testIncrementalRepairWithSubrangesThrows() throws Exception
+ {
+ StorageService.instance.forceRepairRangeAsync("", "", "", true, true, false, "");
+ }
}
[3/4] cassandra git commit: Don't do anticompaction after subrange
repair
Posted by ma...@apache.org.
Don't do anticompaction after subrange repair
Patch by Ariel Weisberg; reviewed by marcuse for CASSANDRA-10422
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/99b82dbb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/99b82dbb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/99b82dbb
Branch: refs/heads/cassandra-3.0
Commit: 99b82dbb43277035562e7b82bb9bdebd84510e96
Parents: d434a33
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Tue Nov 10 13:08:05 2015 -0500
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Nov 17 10:07:59 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/repair/messages/RepairOption.java | 19 ++++++++++++++-----
.../cassandra/service/ActiveRepairService.java | 2 ++
.../apache/cassandra/service/StorageService.java | 9 +++++----
.../repair/messages/RepairOptionTest.java | 10 +++++++---
5 files changed, 29 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/99b82dbb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 489a76d..f5d3416 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.4
+ * Don't do anticompaction after subrange repair (CASSANDRA-10422)
* Fix SimpleDateType type compatibility (CASSANDRA-10027)
* (Hadoop) fix splits calculation (CASSANDRA-10640)
* (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/99b82dbb/src/java/org/apache/cassandra/repair/messages/RepairOption.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index 1780b6b..d50a2ed 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -145,8 +145,9 @@ public class RepairOption
if (rangesStr != null)
{
if (incremental)
- throw new IllegalArgumentException("Incremental repair can't be requested with subrange repair " +
- "because each subrange repair would generate an anti-compacted table");
+ logger.warn("Incremental repair can't be requested with subrange repair " +
+ "because each subrange repair would generate an anti-compacted table. " +
+ "The repair will occur but without anti-compaction.");
StringTokenizer tokenizer = new StringTokenizer(rangesStr, ",");
while (tokenizer.hasMoreTokens())
{
@@ -161,7 +162,7 @@ public class RepairOption
}
}
- RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges);
+ RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty());
// data centers
String dataCentersStr = options.get(DATACENTERS_KEY);
@@ -220,13 +221,14 @@ public class RepairOption
private final boolean incremental;
private final boolean trace;
private final int jobThreads;
+ private final boolean isSubrangeRepair;
private final Collection<String> columnFamilies = new HashSet<>();
private final Collection<String> dataCenters = new HashSet<>();
private final Collection<String> hosts = new HashSet<>();
private final Collection<Range<Token>> ranges = new HashSet<>();
- public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges)
+ public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair)
{
if (FBUtilities.isWindows() &&
(DatabaseDescriptor.getDiskAccessMode() != Config.DiskAccessMode.standard || DatabaseDescriptor.getIndexAccessMode() != Config.DiskAccessMode.standard) &&
@@ -243,6 +245,7 @@ public class RepairOption
this.trace = trace;
this.jobThreads = jobThreads;
this.ranges.addAll(ranges);
+ this.isSubrangeRepair = isSubrangeRepair;
}
public RepairParallelism getParallelism()
@@ -292,8 +295,14 @@ public class RepairOption
public boolean isGlobal()
{
- return dataCenters.isEmpty() && hosts.isEmpty();
+ return dataCenters.isEmpty() && hosts.isEmpty() && !isSubrangeRepair();
}
+
+ public boolean isSubrangeRepair()
+ {
+ return isSubrangeRepair;
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/99b82dbb/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index a6389ea..0cb4252 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -353,6 +353,8 @@ public class ActiveRepairService
{
assert parentRepairSession != null;
ParentRepairSession prs = getParentRepairSession(parentRepairSession);
+ //A repair will be marked as not global if it is a subrange repair to avoid many small anti-compactions
+ //in addition to other scenarios such as repairs not involving all DCs or hosts
if (!prs.isGlobal)
{
logger.info("Not a global repair, will not do anticompaction");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/99b82dbb/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index b5ce38b..80672dd 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2882,7 +2882,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
parallelism = RepairParallelism.PARALLEL;
}
- RepairOption options = new RepairOption(parallelism, primaryRange, !fullRepair, false, 1, Collections.<Range<Token>>emptyList());
+ RepairOption options = new RepairOption(parallelism, primaryRange, !fullRepair, false, 1, Collections.<Range<Token>>emptyList(), false);
if (dataCenters != null)
{
options.getDataCenters().addAll(dataCenters);
@@ -2966,11 +2966,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
if (!fullRepair)
- throw new IllegalArgumentException("Incremental repair can't be requested with subrange repair " +
- "because each subrange repair would generate an anti-compacted table");
+ logger.warn("Incremental repair can't be requested with subrange repair " +
+ "because each subrange repair would generate an anti-compacted table. " +
+ "The repair will occur but without anti-compaction.");
Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
- RepairOption options = new RepairOption(parallelism, false, !fullRepair, false, 1, repairingRange);
+ RepairOption options = new RepairOption(parallelism, false, !fullRepair, false, 1, repairingRange, true);
options.getDataCenters().addAll(dataCenters);
if (hosts != null)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/99b82dbb/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
index 3257a10..cc6f46a 100644
--- a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
@@ -96,10 +96,14 @@ public class RepairOptionTest
assertEquals(expectedHosts, option.getHosts());
}
- @Test(expected=IllegalArgumentException.class)
- public void testIncrementalRepairWithSubrangesThrows() throws Exception
+ @Test
+ public void testIncrementalRepairWithSubrangesIsNotGlobal() throws Exception
{
- RepairOption.parse(ImmutableMap.of(RepairOption.INCREMENTAL_KEY, "true", RepairOption.RANGES_KEY, ""),
+ RepairOption ro = RepairOption.parse(ImmutableMap.of(RepairOption.INCREMENTAL_KEY, "true", RepairOption.RANGES_KEY, "42:42"),
Murmur3Partitioner.instance);
+ assertFalse(ro.isGlobal());
+ ro = RepairOption.parse(ImmutableMap.of(RepairOption.INCREMENTAL_KEY, "true", RepairOption.RANGES_KEY, ""),
+ Murmur3Partitioner.instance);
+ assertTrue(ro.isGlobal());
}
}
[4/4] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eb8afb71
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eb8afb71
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eb8afb71
Branch: refs/heads/cassandra-3.0
Commit: eb8afb71f52c65855efa1d7e8d3ef314ab7373e2
Parents: c0480d8 99b82db
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Nov 17 10:13:18 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Nov 17 10:13:18 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../cassandra/repair/messages/RepairOption.java | 18 +++++++++++++++---
.../cassandra/service/ActiveRepairService.java | 2 ++
.../apache/cassandra/service/StorageService.java | 9 +++++++--
.../repair/messages/RepairOptionTest.java | 17 +++++++++++++++--
5 files changed, 41 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb8afb71/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 8bb67e1,f5d3416..13647cc
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,56 -1,8 +1,58 @@@
-2.2.4
+3.0.1
+ * Correctly preserve deletion info on updated rows when notifying indexers
+ of single-row deletions (CASSANDRA-10694)
+ * Notify indexers of partition delete during cleanup (CASSANDRA-10685)
+ * Keep the file open in trySkipCache (CASSANDRA-10669)
+ * Updated trigger example (CASSANDRA-10257)
+Merged from 2.2:
+ * Don't do anticompaction after subrange repair (CASSANDRA-10422)
* Fix SimpleDateType type compatibility (CASSANDRA-10027)
* (Hadoop) fix splits calculation (CASSANDRA-10640)
* (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
+Merged from 2.1:
++ * Reject incremental repair with subrange repair (CASSANDRA-10422)
+ * Add a nodetool command to refresh size_estimates (CASSANDRA-9579)
+ * Invalidate cache after stream receive task is completed (CASSANDRA-10341)
+ * Reject counter writes in CQLSSTableWriter (CASSANDRA-10258)
+ * Remove superfluous COUNTER_MUTATION stage mapping (CASSANDRA-10605)
+
+
+3.0
+ * Fix AssertionError while flushing memtable due to materialized views
+ incorrectly inserting empty rows (CASSANDRA-10614)
+ * Store UDA initcond as CQL literal in the schema table, instead of a blob (CASSANDRA-10650)
+ * Don't use -1 for the position of partition key in schema (CASSANDRA-10491)
+ * Fix distinct queries in mixed version cluster (CASSANDRA-10573)
+ * Skip sstable on clustering in names query (CASSANDRA-10571)
+ * Remove value skipping as it breaks read-repair (CASSANDRA-10655)
+ * Fix bootstrapping with MVs (CASSANDRA-10621)
+ * Make sure EACH_QUORUM reads are using NTS (CASSANDRA-10584)
+ * Fix MV replica filtering for non-NetworkTopologyStrategy (CASSANDRA-10634)
+ * (Hadoop) fix CIF describeSplits() not handling 0 size estimates (CASSANDRA-10600)
+ * Fix reading of legacy sstables (CASSANDRA-10590)
+ * Use CQL type names in schema metadata tables (CASSANDRA-10365)
+ * Guard batchlog replay against integer division by zero (CASSANDRA-9223)
+ * Fix bug when adding a column to thrift with the same name than a primary key (CASSANDRA-10608)
+ * Add client address argument to IAuthenticator::newSaslNegotiator (CASSANDRA-8068)
+ * Fix implementation of LegacyLayout.LegacyBoundComparator (CASSANDRA-10602)
+ * Don't use 'names query' read path for counters (CASSANDRA-10572)
+ * Fix backward compatibility for counters (CASSANDRA-10470)
+ * Remove memory_allocator paramter from cassandra.yaml (CASSANDRA-10581,10628)
+ * Execute the metadata reload task of all registered indexes on CFS::reload (CASSANDRA-10604)
+ * Fix thrift cas operations with defined columns (CASSANDRA-10576)
+ * Fix PartitionUpdate.operationCount()for updates with static column operations (CASSANDRA-10606)
+ * Fix thrift get() queries with defined columns (CASSANDRA-10586)
+ * Fix marking of indexes as built and removed (CASSANDRA-10601)
+ * Skip initialization of non-registered 2i instances, remove Index::getIndexName (CASSANDRA-10595)
+ * Fix batches on multiple tables (CASSANDRA-10554)
+ * Ensure compaction options are validated when updating KeyspaceMetadata (CASSANDRA-10569)
+ * Flatten Iterator Transformation Hierarchy (CASSANDRA-9975)
+ * Remove token generator (CASSANDRA-5261)
+ * RolesCache should not be created for any authenticator that does not requireAuthentication (CASSANDRA-10562)
+ * Fix LogTransaction checking only a single directory for files (CASSANDRA-10421)
+ * Fix handling of range tombstones when reading old format sstables (CASSANDRA-10360)
+ * Aggregate with Initial Condition fails with C* 3.0 (CASSANDRA-10367)
+Merged from 2.2:
* (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
* Use most up-to-date version of schema for system tables (CASSANDRA-10652)
* Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb8afb71/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb8afb71/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb8afb71/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
----------------------------------------------------------------------
[2/4] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d434a33a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d434a33a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d434a33a
Branch: refs/heads/cassandra-3.0
Commit: d434a33ace2dfe6715f4857f9537ee884f4ef410
Parents: 73a730f a8e8a67
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Nov 17 10:07:04 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Nov 17 10:07:04 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/repair/messages/RepairOption.java | 3 +++
.../org/apache/cassandra/service/StorageService.java | 4 ++++
.../cassandra/repair/messages/RepairOptionTest.java | 13 +++++++++++--
4 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d434a33a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 5705453,b6b394a..489a76d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,16 -1,5 +1,17 @@@
-2.1.12
+2.2.4
+ * Fix SimpleDateType type compatibility (CASSANDRA-10027)
+ * (Hadoop) fix splits calculation (CASSANDRA-10640)
+ * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
+ * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
+ * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
+ * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
+ * Expose phi values from failure detector via JMX and tweak debug
+ and trace logging (CASSANDRA-9526)
+ * Fix RangeNamesQueryPager (CASSANDRA-10509)
+ * Deprecate Pig support (CASSANDRA-10542)
+ * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
+Merged from 2.1:
+ * Reject incremental repair with subrange repair (CASSANDRA-10422)
* Add a nodetool command to refresh size_estimates (CASSANDRA-9579)
* Shutdown compaction in drain to prevent leak (CASSANDRA-10079)
* Invalidate cache after stream receive task is completed (CASSANDRA-10341)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d434a33a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/messages/RepairOption.java
index f3e452c,0000000..1780b6b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@@ -1,308 -1,0 +1,311 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.messages;
+
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.repair.RepairParallelism;
+import org.apache.cassandra.tools.nodetool.Repair;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Repair options.
+ */
+public class RepairOption
+{
+ public static final String PARALLELISM_KEY = "parallelism";
+ public static final String PRIMARY_RANGE_KEY = "primaryRange";
+ public static final String INCREMENTAL_KEY = "incremental";
+ public static final String JOB_THREADS_KEY = "jobThreads";
+ public static final String RANGES_KEY = "ranges";
+ public static final String COLUMNFAMILIES_KEY = "columnFamilies";
+ public static final String DATACENTERS_KEY = "dataCenters";
+ public static final String HOSTS_KEY = "hosts";
+ public static final String TRACE_KEY = "trace";
+
+ // we don't want to push nodes too much for repair
+ public static final int MAX_JOB_THREADS = 4;
+
+ private static final Logger logger = LoggerFactory.getLogger(RepairOption.class);
+
+ /**
+ * Construct RepairOptions object from given map of Strings.
+ * <p>
+ * Available options are:
+ *
+ * <table>
+ * <caption>Repair Options</caption>
+ * <thead>
+ * <tr>
+ * <th>key</th>
+ * <th>value</th>
+ * <th>default (when key not given)</th>
+ * </tr>
+ * </thead>
+ * <tbody>
+ * <tr>
+ * <td>parallelism</td>
+ * <td>"sequential", "parallel" or "dc_parallel"</td>
+ * <td>"sequential"</td>
+ * </tr>
+ * <tr>
+ * <td>primaryRange</td>
+ * <td>"true" if perform repair only on primary range.</td>
+ * <td>false</td>
+ * </tr>
+ * <tr>
+ * <td>incremental</td>
+ * <td>"true" if perform incremental repair.</td>
+ * <td>false</td>
+ * </tr>
+ * <tr>
+ * <td>trace</td>
+ * <td>"true" if repair is traced.</td>
+ * <td>false</td>
+ * </tr>
+ * <tr>
+ * <td>jobThreads</td>
+ * <td>Number of threads to use to run repair job.</td>
+ * <td>1</td>
+ * </tr>
+ * <tr>
+ * <td>ranges</td>
+ * <td>Ranges to repair. A range is expressed as <start token>:<end token>
+ * and multiple ranges can be given as comma separated ranges(e.g. aaa:bbb,ccc:ddd).</td>
+ * <td></td>
+ * </tr>
+ * <tr>
+ * <td>columnFamilies</td>
+ * <td>Specify names of ColumnFamilies to repair.
+ * Multiple ColumnFamilies can be given as comma separated values(e.g. cf1,cf2,cf3).</td>
+ * <td></td>
+ * </tr>
+ * <tr>
+ * <td>dataCenters</td>
+ * <td>Specify names of data centers who participate in this repair.
+ * Multiple data centers can be given as comma separated values(e.g. dc1,dc2,dc3).</td>
+ * <td></td>
+ * </tr>
+ * <tr>
+ * <td>hosts</td>
+ * <td>Specify names of hosts who participate in this repair.
+ * Multiple hosts can be given as comma separated values(e.g. cass1,cass2).</td>
+ * <td></td>
+ * </tr>
+ * </tbody>
+ * </table>
+ *
+ * @param options options to parse
+ * @param partitioner partitioner is used to construct token ranges
+ * @return RepairOptions object
+ */
+ public static RepairOption parse(Map<String, String> options, IPartitioner partitioner)
+ {
+ // if no parallel option is given, then this will be "sequential" by default.
+ RepairParallelism parallelism = RepairParallelism.fromName(options.get(PARALLELISM_KEY));
+ boolean primaryRange = Boolean.parseBoolean(options.get(PRIMARY_RANGE_KEY));
+ boolean incremental = Boolean.parseBoolean(options.get(INCREMENTAL_KEY));
+ boolean trace = Boolean.parseBoolean(options.get(TRACE_KEY));
+
+ int jobThreads = 1;
+ if (options.containsKey(JOB_THREADS_KEY))
+ {
+ try
+ {
+ jobThreads = Integer.parseInt(options.get(JOB_THREADS_KEY));
+ }
+ catch (NumberFormatException ignore) {}
+ }
+ // ranges
+ String rangesStr = options.get(RANGES_KEY);
+ Set<Range<Token>> ranges = new HashSet<>();
+ if (rangesStr != null)
+ {
++ if (incremental)
++ throw new IllegalArgumentException("Incremental repair can't be requested with subrange repair " +
++ "because each subrange repair would generate an anti-compacted table");
+ StringTokenizer tokenizer = new StringTokenizer(rangesStr, ",");
+ while (tokenizer.hasMoreTokens())
+ {
+ String[] rangeStr = tokenizer.nextToken().split(":", 2);
+ if (rangeStr.length < 2)
+ {
+ continue;
+ }
+ Token parsedBeginToken = partitioner.getTokenFactory().fromString(rangeStr[0].trim());
+ Token parsedEndToken = partitioner.getTokenFactory().fromString(rangeStr[1].trim());
+ ranges.add(new Range<>(parsedBeginToken, parsedEndToken));
+ }
+ }
+
+ RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges);
+
+ // data centers
+ String dataCentersStr = options.get(DATACENTERS_KEY);
+ Collection<String> dataCenters = new HashSet<>();
+ if (dataCentersStr != null)
+ {
+ StringTokenizer tokenizer = new StringTokenizer(dataCentersStr, ",");
+ while (tokenizer.hasMoreTokens())
+ {
+ dataCenters.add(tokenizer.nextToken().trim());
+ }
+ option.getDataCenters().addAll(dataCenters);
+ }
+
+ // hosts
+ String hostsStr = options.get(HOSTS_KEY);
+ Collection<String> hosts = new HashSet<>();
+ if (hostsStr != null)
+ {
+ StringTokenizer tokenizer = new StringTokenizer(hostsStr, ",");
+ while (tokenizer.hasMoreTokens())
+ {
+ hosts.add(tokenizer.nextToken().trim());
+ }
+ option.getHosts().addAll(hosts);
+ }
+
+ // columnfamilies
+ String cfStr = options.get(COLUMNFAMILIES_KEY);
+ if (cfStr != null)
+ {
+ Collection<String> columnFamilies = new HashSet<>();
+ StringTokenizer tokenizer = new StringTokenizer(cfStr, ",");
+ while (tokenizer.hasMoreTokens())
+ {
+ columnFamilies.add(tokenizer.nextToken().trim());
+ }
+ option.getColumnFamilies().addAll(columnFamilies);
+ }
+
+ // validate options
+ if (jobThreads > MAX_JOB_THREADS)
+ {
+ throw new IllegalArgumentException("Too many job threads. Max is " + MAX_JOB_THREADS);
+ }
+ if (primaryRange && (!dataCenters.isEmpty() || !hosts.isEmpty()))
+ {
+ throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+ }
+
+ return option;
+ }
+
+ private final RepairParallelism parallelism;
+ private final boolean primaryRange;
+ private final boolean incremental;
+ private final boolean trace;
+ private final int jobThreads;
+
+ private final Collection<String> columnFamilies = new HashSet<>();
+ private final Collection<String> dataCenters = new HashSet<>();
+ private final Collection<String> hosts = new HashSet<>();
+ private final Collection<Range<Token>> ranges = new HashSet<>();
+
+ public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges)
+ {
+ if (FBUtilities.isWindows() &&
+ (DatabaseDescriptor.getDiskAccessMode() != Config.DiskAccessMode.standard || DatabaseDescriptor.getIndexAccessMode() != Config.DiskAccessMode.standard) &&
+ parallelism == RepairParallelism.SEQUENTIAL)
+ {
+ logger.warn("Sequential repair disabled when memory-mapped I/O is configured on Windows. Reverting to parallel.");
+ this.parallelism = RepairParallelism.PARALLEL;
+ }
+ else
+ this.parallelism = parallelism;
+
+ this.primaryRange = primaryRange;
+ this.incremental = incremental;
+ this.trace = trace;
+ this.jobThreads = jobThreads;
+ this.ranges.addAll(ranges);
+ }
+
+ public RepairParallelism getParallelism()
+ {
+ return parallelism;
+ }
+
+ public boolean isPrimaryRange()
+ {
+ return primaryRange;
+ }
+
+ public boolean isIncremental()
+ {
+ return incremental;
+ }
+
+ public boolean isTraced()
+ {
+ return trace;
+ }
+
+ public int getJobThreads()
+ {
+ return jobThreads;
+ }
+
+ public Collection<String> getColumnFamilies()
+ {
+ return columnFamilies;
+ }
+
+ public Collection<Range<Token>> getRanges()
+ {
+ return ranges;
+ }
+
+ public Collection<String> getDataCenters()
+ {
+ return dataCenters;
+ }
+
+ public Collection<String> getHosts()
+ {
+ return hosts;
+ }
+
+ public boolean isGlobal()
+ {
+ return dataCenters.isEmpty() && hosts.isEmpty();
+ }
+ @Override
+ public String toString()
+ {
+ return "repair options (" +
+ "parallelism: " + parallelism +
+ ", primary range: " + primaryRange +
+ ", incremental: " + incremental +
+ ", job threads: " + jobThreads +
+ ", ColumnFamilies: " + columnFamilies +
+ ", dataCenters: " + dataCenters +
+ ", hosts: " + hosts +
+ ", # of ranges: " + ranges.size() +
+ ')';
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d434a33a/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 74b3c73,03c1960..b5ce38b
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2958,47 -2819,25 +2958,51 @@@ public class StorageService extends Not
{
throw new IllegalArgumentException("Invalid parallelism degree specified: " + parallelismDegree);
}
- Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
-
- logger.info("starting user-requested repair of range {} for keyspace {} and column families {}",
- repairingRange, keyspaceName, columnFamilies);
-
RepairParallelism parallelism = RepairParallelism.values()[parallelismDegree];
- return forceRepairAsync(keyspaceName, parallelism, dataCenters, hosts, repairingRange, fullRepair, columnFamilies);
- }
+ if (FBUtilities.isWindows() && parallelism != RepairParallelism.PARALLEL)
+ {
+ logger.warn("Snapshot-based repair is not yet supported on Windows. Reverting to parallel repair.");
+ parallelism = RepairParallelism.PARALLEL;
+ }
+
- public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... columnFamilies)
- {
+ if (!fullRepair)
- throw new IllegalArgumentException("Incremental repair can't be requested with subrange repair because " +
- "each subrange repair would generate an anti-compacted table");
++ throw new IllegalArgumentException("Incremental repair can't be requested with subrange repair " +
++ "because each subrange repair would generate an anti-compacted table");
Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
+ RepairOption options = new RepairOption(parallelism, false, !fullRepair, false, 1, repairingRange);
+ options.getDataCenters().addAll(dataCenters);
+ if (hosts != null)
+ {
+ options.getHosts().addAll(hosts);
+ }
+ if (columnFamilies != null)
+ {
+ for (String columnFamily : columnFamilies)
+ {
+ options.getColumnFamilies().add(columnFamily);
+ }
+ }
+
logger.info("starting user-requested repair of range {} for keyspace {} and column families {}",
- repairingRange, keyspaceName, columnFamilies);
- return forceRepairAsync(keyspaceName, isSequential, isLocal, repairingRange, fullRepair, columnFamilies);
+ repairingRange, keyspaceName, columnFamilies);
+ return forceRepairAsync(keyspaceName, options);
+ }
+
+ public int forceRepairRangeAsync(String beginToken,
+ String endToken,
+ String keyspaceName,
+ boolean isSequential,
+ boolean isLocal,
+ boolean fullRepair,
+ String... columnFamilies)
+ {
+ Set<String> dataCenters = null;
+ if (isLocal)
+ {
+ dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
+ }
+ return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential, dataCenters, null, fullRepair, columnFamilies);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d434a33a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
index 11ae69f,0000000..3257a10
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
@@@ -1,96 -1,0 +1,105 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.messages;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Test;
+
++import com.google.common.collect.ImmutableMap;
++
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.repair.RepairParallelism;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.*;
+
+public class RepairOptionTest
+{
+ @Test
+ public void testParseOptions()
+ {
+ IPartitioner partitioner = Murmur3Partitioner.instance;
+ Token.TokenFactory tokenFactory = partitioner.getTokenFactory();
+
+ // parse with empty options
+ RepairOption option = RepairOption.parse(new HashMap<String, String>(), partitioner);
+
+ if (FBUtilities.isWindows() && (DatabaseDescriptor.getDiskAccessMode() != Config.DiskAccessMode.standard || DatabaseDescriptor.getIndexAccessMode() != Config.DiskAccessMode.standard))
+ assertTrue(option.getParallelism() == RepairParallelism.PARALLEL);
+ else
+ assertTrue(option.getParallelism() == RepairParallelism.SEQUENTIAL);
+
+ assertFalse(option.isPrimaryRange());
+ assertFalse(option.isIncremental());
+
+ // parse everything
+ Map<String, String> options = new HashMap<>();
+ options.put(RepairOption.PARALLELISM_KEY, "parallel");
+ options.put(RepairOption.PRIMARY_RANGE_KEY, "false");
- options.put(RepairOption.INCREMENTAL_KEY, "true");
++ options.put(RepairOption.INCREMENTAL_KEY, "false");
+ options.put(RepairOption.RANGES_KEY, "0:10,11:20,21:30");
+ options.put(RepairOption.COLUMNFAMILIES_KEY, "cf1,cf2,cf3");
+ options.put(RepairOption.DATACENTERS_KEY, "dc1,dc2,dc3");
+ options.put(RepairOption.HOSTS_KEY, "127.0.0.1,127.0.0.2,127.0.0.3");
+
+ option = RepairOption.parse(options, partitioner);
+ assertTrue(option.getParallelism() == RepairParallelism.PARALLEL);
+ assertFalse(option.isPrimaryRange());
- assertTrue(option.isIncremental());
++ assertFalse(option.isIncremental());
+
+ Set<Range<Token>> expectedRanges = new HashSet<>(3);
+ expectedRanges.add(new Range<>(tokenFactory.fromString("0"), tokenFactory.fromString("10")));
+ expectedRanges.add(new Range<>(tokenFactory.fromString("11"), tokenFactory.fromString("20")));
+ expectedRanges.add(new Range<>(tokenFactory.fromString("21"), tokenFactory.fromString("30")));
+ assertEquals(expectedRanges, option.getRanges());
+
+ Set<String> expectedCFs = new HashSet<>(3);
+ expectedCFs.add("cf1");
+ expectedCFs.add("cf2");
+ expectedCFs.add("cf3");
+ assertEquals(expectedCFs, option.getColumnFamilies());
+
+ Set<String> expectedDCs = new HashSet<>(3);
+ expectedDCs.add("dc1");
+ expectedDCs.add("dc2");
+ expectedDCs.add("dc3");
+ assertEquals(expectedDCs, option.getDataCenters());
+
+ Set<String> expectedHosts = new HashSet<>(3);
+ expectedHosts.add("127.0.0.1");
+ expectedHosts.add("127.0.0.2");
+ expectedHosts.add("127.0.0.3");
+ assertEquals(expectedHosts, option.getHosts());
+ }
++
++ @Test(expected=IllegalArgumentException.class)
++ public void testIncrementalRepairWithSubrangesThrows() throws Exception
++ {
++ RepairOption.parse(ImmutableMap.of(RepairOption.INCREMENTAL_KEY, "true", RepairOption.RANGES_KEY, ""),
++ Murmur3Partitioner.instance);
++ }
+}