You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2016/06/15 09:48:47 UTC
[01/10] cassandra git commit: Cache local ranges when calculating
repair neighbors
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 1d2d0749a -> 2f74831f4
refs/heads/cassandra-2.2 52a827b4d -> c2566d1cf
refs/heads/cassandra-3.0 3c531cbec -> 70ee4ed49
refs/heads/trunk 4ed00607d -> 4210a25a9
Cache local ranges when calculating repair neighbors
patch by Mahdi Mohammadi; reviewed by Paulo Motta for CASSANDRA-11933
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f74831f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f74831f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f74831f
Branch: refs/heads/cassandra-2.1
Commit: 2f74831f4218142f6e118678a3c74c79c1f7e1ed
Parents: 1d2d074
Author: Mahdi Mohammadi <ma...@gmail.com>
Authored: Wed Jun 15 11:43:27 2016 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Jun 15 11:43:27 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/service/ActiveRepairService.java | 8 +++++---
.../org/apache/cassandra/service/StorageService.java | 6 +++++-
.../service/AntiEntropyServiceTestAbstract.java | 14 ++++++++------
4 files changed, 19 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7d70902..ec2b48e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.15
+ * Cache local ranges when calculating repair neighbors (CASSANDRA-11933)
* Allow LWT operation on static column with only partition keys (CASSANDRA-10532)
* Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886)
* cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index f8975f9..4c83c48 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -164,7 +164,8 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
RepairFuture submitArtificialRepairSession(RepairJobDesc desc)
{
Set<InetAddress> neighbours = new HashSet<>();
- neighbours.addAll(ActiveRepairService.getNeighbors(desc.keyspace, desc.range, null, null));
+ Collection<Range<Token>> keyspaceLocalRanges = StorageService.instance.getLocalRanges(desc.keyspace);
+ neighbours.addAll(ActiveRepairService.getNeighbors(desc.keyspace, keyspaceLocalRanges, desc.range, null, null));
RepairSession session = new RepairSession(desc.parentSessionId, desc.sessionId, desc.range, desc.keyspace, RepairParallelism.PARALLEL, neighbours, new String[]{desc.columnFamily});
sessions.put(session.getId(), session);
RepairFuture futureTask = new RepairFuture(session);
@@ -176,17 +177,18 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
* Return all of the neighbors with whom we share the provided range.
*
* @param keyspaceName keyspace to repair
+ * @param keyspaceLocalRanges local-range for given keyspaceName
* @param toRepair token to repair
* @param dataCenters the data centers to involve in the repair
*
* @return neighbors with whom we share the provided range
*/
- public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts)
+ public static Set<InetAddress> getNeighbors(String keyspaceName, Collection<Range<Token>> keyspaceLocalRanges, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts)
{
StorageService ss = StorageService.instance;
Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(keyspaceName);
Range<Token> rangeSuperSet = null;
- for (Range<Token> range : ss.getLocalRanges(keyspaceName))
+ for (Range<Token> range : keyspaceLocalRanges)
{
if (range.contains(toRepair))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index eea4556..27939f9 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2978,13 +2978,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return;
}
+ //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent
+ //calculation multiple times
+ Collection<Range<Token>> keyspaceLocalRanges = getLocalRanges(keyspace);
+
Set<InetAddress> allNeighbors = new HashSet<>();
Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
for (Range<Token> range : ranges)
{
try
{
- Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range, dataCenters, hosts);
+ Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range, dataCenters, hosts);
rangeToNeighbors.put(range, neighbors);
allNeighbors.addAll(neighbors);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index ac39de6..21eb492 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -123,7 +123,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null, null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, null, null));
}
assertEquals(expected, neighbors);
}
@@ -146,7 +146,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null, null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, null, null));
}
assertEquals(expected, neighbors);
}
@@ -168,7 +168,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
}
assertEquals(expected, neighbors);
}
@@ -196,7 +196,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
}
assertEquals(expected, neighbors);
}
@@ -218,7 +218,8 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
expected.remove(FBUtilities.getBroadcastAddress());
Collection<String> hosts = Arrays.asList(FBUtilities.getBroadcastAddress().getCanonicalHostName(),expected.get(0).getCanonicalHostName());
- assertEquals(expected.get(0), ActiveRepairService.getNeighbors(keyspaceName, StorageService.instance.getLocalRanges(keyspaceName).iterator().next(), null, hosts).iterator().next());
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspaceName);
+ assertEquals(expected.get(0), ActiveRepairService.getNeighbors(keyspaceName, ranges, ranges.iterator().next(), null, hosts).iterator().next());
}
@Test(expected = IllegalArgumentException.class)
@@ -227,7 +228,8 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
addTokens(2 * Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor());
//Dont give local endpoint
Collection<String> hosts = Arrays.asList("127.0.0.3");
- ActiveRepairService.getNeighbors(keyspaceName, StorageService.instance.getLocalRanges(keyspaceName).iterator().next(), null, hosts);
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspaceName);
+ ActiveRepairService.getNeighbors(keyspaceName, ranges, ranges.iterator().next(), null, hosts);
}
Set<InetAddress> addTokens(int max) throws Throwable
[07/10] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by sn...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c2566d1c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c2566d1c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c2566d1c
Branch: refs/heads/trunk
Commit: c2566d1cf9e239063a530ec08e8e098feffe387b
Parents: 52a827b 2f74831
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Jun 15 11:44:57 2016 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Jun 15 11:44:57 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/repair/RepairRunnable.java | 9 +++++++--
.../cassandra/service/ActiveRepairService.java | 5 +++--
.../service/ActiveRepairServiceTest.java | 18 ++++++++++--------
4 files changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2566d1c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d0ca37f,ec2b48e..d9afaa3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,30 -1,5 +1,31 @@@
-2.1.15
+2.2.7
+ * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984)
+ * Persist local metadata earlier in startup sequence (CASSANDRA-11742)
+ * Run CommitLog tests with different compression settings (CASSANDRA-9039)
+ * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
+ * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
+ * Fix possible race condition in CommitLog.recover (CASSANDRA-11743)
+ * Enable client encryption in sstableloader with cli options (CASSANDRA-11708)
+ * Possible memory leak in NIODataInputStream (CASSANDRA-11867)
+ * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
+ * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
+ * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
+ * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
+ * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
+ * Produce a heap dump when exiting on OOM (CASSANDRA-9861)
+ * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427)
+ * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510)
+ * JSON datetime formatting needs timezone (CASSANDRA-11137)
+ * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502)
+ * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660)
+ * Add missing files to debian packages (CASSANDRA-11642)
+ * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621)
+ * cqlsh: COPY FROM should use regular inserts for single statement batches and
+ report errors correctly if workers processes crash on initialization (CASSANDRA-11474)
+ * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553)
+ * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988)
+Merged from 2.1:
+ * Cache local ranges when calculating repair neighbors (CASSANDRA-11933)
* Allow LWT operation on static column with only partition keys (CASSANDRA-10532)
* Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886)
* cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2566d1c/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java
index b849cf8,0000000..f92310b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@@ -1,409 -1,0 +1,414 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.*;
+import org.apache.commons.lang3.time.DurationFormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tracing.TraceKeyspace;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.utils.progress.ProgressEvent;
+import org.apache.cassandra.utils.progress.ProgressEventNotifier;
+import org.apache.cassandra.utils.progress.ProgressEventType;
+import org.apache.cassandra.utils.progress.ProgressListener;
+
+public class RepairRunnable extends WrappedRunnable implements ProgressEventNotifier
+{
+ private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class);
+
+ private StorageService storageService;
+ private final int cmd;
+ private final RepairOption options;
+ private final String keyspace;
+
+ private final List<ProgressListener> listeners = new ArrayList<>();
+
+ public RepairRunnable(StorageService storageService, int cmd, RepairOption options, String keyspace)
+ {
+ this.storageService = storageService;
+ this.cmd = cmd;
+ this.options = options;
+ this.keyspace = keyspace;
+ }
+
+ @Override
+ public void addProgressListener(ProgressListener listener)
+ {
+ listeners.add(listener);
+ }
+
+ @Override
+ public void removeProgressListener(ProgressListener listener)
+ {
+ listeners.remove(listener);
+ }
+
+ protected void fireProgressEvent(String tag, ProgressEvent event)
+ {
+ for (ProgressListener listener : listeners)
+ {
+ listener.progress(tag, event);
+ }
+ }
+
+ protected void fireErrorAndComplete(String tag, int progressCount, int totalProgress, String message)
+ {
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progressCount, totalProgress, message));
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress));
+ }
+
+ protected void runMayThrow() throws Exception
+ {
+ final TraceState traceState;
+
+ final String tag = "repair:" + cmd;
+
+ final AtomicInteger progress = new AtomicInteger();
+ final int totalProgress = 3 + options.getRanges().size(); // calculate neighbors, validation, prepare for repair + number of ranges to repair
+
+ String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]);
+ Iterable<ColumnFamilyStore> validColumnFamilies = storageService.getValidColumnFamilies(false, false, keyspace,
+ columnFamilies);
+
+ final long startTime = System.currentTimeMillis();
+ String message = String.format("Starting repair command #%d, repairing keyspace %s with %s", cmd, keyspace,
+ options);
+ logger.info(message);
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.START, 0, 100, message));
+ if (options.isTraced())
+ {
+ StringBuilder cfsb = new StringBuilder();
+ for (ColumnFamilyStore cfs : validColumnFamilies)
+ cfsb.append(", ").append(cfs.keyspace.getName()).append(".").append(cfs.name);
+
+ UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR);
+ traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace, "columnFamilies",
+ cfsb.substring(2)));
+ Tracing.traceRepair(message);
+ traceState.enableActivityNotification(tag);
+ for (ProgressListener listener : listeners)
+ traceState.addProgressListener(listener);
+ Thread queryThread = createQueryThread(cmd, sessionId);
+ queryThread.setName("RepairTracePolling");
+ queryThread.start();
+ }
+ else
+ {
+ traceState = null;
+ }
+
+ final Set<InetAddress> allNeighbors = new HashSet<>();
+ Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
++
++ //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent
++ //calculation multiple times
++ Collection<Range<Token>> keyspaceLocalRanges = storageService.getLocalRanges(keyspace);
++
+ try
+ {
+ for (Range<Token> range : options.getRanges())
+ {
- Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range,
- options.getDataCenters(),
++ Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges,
++ range, options.getDataCenters(),
+ options.getHosts());
+ rangeToNeighbors.put(range, neighbors);
+ allNeighbors.addAll(neighbors);
+ }
+ progress.incrementAndGet();
+ }
+ catch (IllegalArgumentException e)
+ {
+ logger.error("Repair failed:", e);
+ fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage());
+ return;
+ }
+
+ // Validate columnfamilies
+ List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>();
+ try
+ {
+ Iterables.addAll(columnFamilyStores, validColumnFamilies);
+ progress.incrementAndGet();
+ }
+ catch (IllegalArgumentException e)
+ {
+ fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage());
+ return;
+ }
+
+ String[] cfnames = new String[columnFamilyStores.size()];
+ for (int i = 0; i < columnFamilyStores.size(); i++)
+ {
+ cfnames[i] = columnFamilyStores.get(i).name;
+ }
+
+ final UUID parentSession = UUIDGen.getTimeUUID();
+ SystemDistributedKeyspace.startParentRepair(parentSession, keyspace, cfnames, options.getRanges());
+ long repairedAt;
+ try
+ {
+ ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddress(), allNeighbors, options, columnFamilyStores);
+ repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).getRepairedAt();
+ progress.incrementAndGet();
+ }
+ catch (Throwable t)
+ {
+ SystemDistributedKeyspace.failParentRepair(parentSession, t);
+ fireErrorAndComplete(tag, progress.get(), totalProgress, t.getMessage());
+ return;
+ }
+
+ // Set up RepairJob executor for this repair command.
+ final ListeningExecutorService executor = MoreExecutors.listeningDecorator(new JMXConfigurableThreadPoolExecutor(options.getJobThreads(),
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory("Repair#" + cmd),
+ "internal"));
+
+ List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
+ for (Range<Token> range : options.getRanges())
+ {
+ final RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
+ range,
+ keyspace,
+ options.getParallelism(),
+ rangeToNeighbors.get(range),
+ repairedAt,
+ executor,
+ cfnames);
+ if (session == null)
+ continue;
+ // After repair session completes, notify client its result
+ Futures.addCallback(session, new FutureCallback<RepairSessionResult>()
+ {
+ public void onSuccess(RepairSessionResult result)
+ {
+ /**
+ * If the success message below is modified, it must also be updated on
+ * {@link org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport}
+ * for backward-compatibility support.
+ */
+ String message = String.format("Repair session %s for range %s finished", session.getId(),
+ session.getRange().toString());
+ logger.info(message);
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS,
+ progress.incrementAndGet(),
+ totalProgress,
+ message));
+ }
+
+ public void onFailure(Throwable t)
+ {
+ /**
+ * If the failure message below is modified, it must also be updated on
+ * {@link org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport}
+ * for backward-compatibility support.
+ */
+ String message = String.format("Repair session %s for range %s failed with error %s",
+ session.getId(), session.getRange().toString(), t.getMessage());
+ logger.error(message, t);
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS,
+ progress.incrementAndGet(),
+ totalProgress,
+ message));
+ }
+ });
+ futures.add(session);
+ }
+
+ // After all repair sessions completes(successful or not),
+ // run anticompaction if necessary and send finish notice back to client
+ final Collection<Range<Token>> successfulRanges = new ArrayList<>();
+ final AtomicBoolean hasFailure = new AtomicBoolean();
+ final ListenableFuture<List<RepairSessionResult>> allSessions = Futures.successfulAsList(futures);
+ ListenableFuture anticompactionResult = Futures.transform(allSessions, new AsyncFunction<List<RepairSessionResult>, Object>()
+ {
+ @SuppressWarnings("unchecked")
+ public ListenableFuture apply(List<RepairSessionResult> results) throws Exception
+ {
+ // filter out null(=failed) results and get successful ranges
+ for (RepairSessionResult sessionResult : results)
+ {
+ if (sessionResult != null)
+ {
+ successfulRanges.add(sessionResult.range);
+ }
+ else
+ {
+ hasFailure.compareAndSet(false, true);
+ }
+ }
+ return ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges);
+ }
+ });
+ Futures.addCallback(anticompactionResult, new FutureCallback<Object>()
+ {
+ public void onSuccess(Object result)
+ {
+ SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges);
+ if (hasFailure.get())
+ {
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress,
+ "Some repair failed"));
+ }
+ else
+ {
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress,
+ "Repair completed successfully"));
+ }
+ repairComplete();
+ }
+
+ public void onFailure(Throwable t)
+ {
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage()));
+ SystemDistributedKeyspace.failParentRepair(parentSession, t);
+ repairComplete();
+ }
+
+ private void repairComplete()
+ {
+ String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime,
+ true, true);
+ String message = String.format("Repair command #%d finished in %s", cmd, duration);
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message));
+ logger.info(message);
+ if (options.isTraced() && traceState != null)
+ {
+ for (ProgressListener listener : listeners)
+ traceState.removeProgressListener(listener);
+ // Because DebuggableThreadPoolExecutor#afterExecute and this callback
+ // run in a nondeterministic order (within the same thread), the
+ // TraceState may have been nulled out at this point. The TraceState
+ // should be traceState, so just set it without bothering to check if it
+ // actually was nulled out.
+ Tracing.instance.set(traceState);
+ Tracing.traceRepair(message);
+ Tracing.instance.stopSession();
+ }
+ executor.shutdownNow();
+ }
+ });
+ }
+
+ private Thread createQueryThread(final int cmd, final UUID sessionId)
+ {
+ return new Thread(new WrappedRunnable()
+ {
+ // Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces.
+ // Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts.
+ public void runMayThrow() throws Exception
+ {
+ TraceState state = Tracing.instance.get(sessionId);
+ if (state == null)
+ throw new Exception("no tracestate");
+
+ String format = "select event_id, source, activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;";
+ String query = String.format(format, TraceKeyspace.NAME, TraceKeyspace.EVENTS);
+ SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement;
+
+ ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId);
+ InetAddress source = FBUtilities.getBroadcastAddress();
+
+ HashSet<UUID>[] seen = new HashSet[] { new HashSet<>(), new HashSet<>() };
+ int si = 0;
+ UUID uuid;
+
+ long tlast = System.currentTimeMillis(), tcur;
+
+ TraceState.Status status;
+ long minWaitMillis = 125;
+ long maxWaitMillis = 1000 * 1024L;
+ long timeout = minWaitMillis;
+ boolean shouldDouble = false;
+
+ while ((status = state.waitActivity(timeout)) != TraceState.Status.STOPPED)
+ {
+ if (status == TraceState.Status.IDLE)
+ {
+ timeout = shouldDouble ? Math.min(timeout * 2, maxWaitMillis) : timeout;
+ shouldDouble = !shouldDouble;
+ }
+ else
+ {
+ timeout = minWaitMillis;
+ shouldDouble = false;
+ }
+ ByteBuffer tminBytes = ByteBufferUtil.bytes(UUIDGen.minTimeUUID(tlast - 1000));
+ ByteBuffer tmaxBytes = ByteBufferUtil.bytes(UUIDGen.maxTimeUUID(tcur = System.currentTimeMillis()));
+ QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(sessionIdBytes,
+ tminBytes,
+ tmaxBytes));
+ ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options);
+ UntypedResultSet result = UntypedResultSet.create(rows.result);
+
+ for (UntypedResultSet.Row r : result)
+ {
+ if (source.equals(r.getInetAddress("source")))
+ continue;
+ if ((uuid = r.getUUID("event_id")).timestamp() > (tcur - 1000) * 10000)
+ seen[si].add(uuid);
+ if (seen[si == 0 ? 1 : 0].contains(uuid))
+ continue;
+ String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity"));
+ fireProgressEvent("repair:" + cmd,
+ new ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message));
+ }
+ tlast = tcur;
+
+ si = si == 0 ? 1 : 0;
+ seen[si].clear();
+ }
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2566d1c/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2566d1c/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index 7793660,26e5126..f34d0e2
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@@ -50,180 -43,11 +50,182 @@@ import org.apache.cassandra.utils.concu
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-public class ActiveRepairServiceTest extends SchemaLoader
+public class ActiveRepairServiceTest
{
+ public static final String KEYSPACE5 = "Keyspace5";
+ public static final String CF_STANDARD1 = "Standard1";
+ public static final String CF_COUNTER = "Counter1";
- private static final String KEYSPACE1 = "Keyspace1";
- private static final String CF = "Standard1";
+ public String cfname;
+ public ColumnFamilyStore store;
+ public InetAddress LOCAL, REMOTE;
+
+ private boolean initialized;
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE5,
+ SimpleStrategy.class,
+ KSMetaData.optsWithRF(2),
+ SchemaLoader.standardCFMD(KEYSPACE5, CF_COUNTER),
+ SchemaLoader.standardCFMD(KEYSPACE5, CF_STANDARD1));
+ }
+
+ @Before
+ public void prepare() throws Exception
+ {
+ if (!initialized)
+ {
+ SchemaLoader.startGossiper();
+ initialized = true;
+
+ LOCAL = FBUtilities.getBroadcastAddress();
+ // generate a fake endpoint for which we can spoof receiving/sending trees
+ REMOTE = InetAddress.getByName("127.0.0.2");
+ }
+
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+ tmd.clearUnsafe();
+ StorageService.instance.setTokens(Collections.singleton(StorageService.getPartitioner().getRandomToken()));
+ tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(), REMOTE);
+ assert tmd.isMember(REMOTE);
+ }
+
+ @Test
+ public void testGetNeighborsPlusOne() throws Throwable
+ {
+ // generate rf+1 nodes, and ensure that all nodes are returned
+ Set<InetAddress> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ expected.remove(FBUtilities.getBroadcastAddress());
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+ Set<InetAddress> neighbors = new HashSet<>();
+ for (Range<Token> range : ranges)
+ {
- neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, null, null));
++ neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, null, null));
+ }
+ assertEquals(expected, neighbors);
+ }
+
+ @Test
+ public void testGetNeighborsTimesTwo() throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+ // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned
+ addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy();
+ Set<InetAddress> expected = new HashSet<>();
+ for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
+ {
+ expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+ }
+ expected.remove(FBUtilities.getBroadcastAddress());
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+ Set<InetAddress> neighbors = new HashSet<>();
+ for (Range<Token> range : ranges)
+ {
- neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, null, null));
++ neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, null, null));
+ }
+ assertEquals(expected, neighbors);
+ }
+
+ @Test
+ public void testGetNeighborsPlusOneInLocalDC() throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+ // generate rf+1 nodes, and ensure that all nodes are returned
+ Set<InetAddress> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ expected.remove(FBUtilities.getBroadcastAddress());
+ // remove remote endpoints
+ TokenMetadata.Topology topology = tmd.cloneOnlyTokenMap().getTopology();
+ HashSet<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
+ expected = Sets.intersection(expected, localEndpoints);
+
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+ Set<InetAddress> neighbors = new HashSet<>();
+ for (Range<Token> range : ranges)
+ {
- neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
++ neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+ }
+ assertEquals(expected, neighbors);
+ }
+
+ @Test
+ public void testGetNeighborsTimesTwoInLocalDC() throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+ // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned
+ addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy();
+ Set<InetAddress> expected = new HashSet<>();
+ for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
+ {
+ expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+ }
+ expected.remove(FBUtilities.getBroadcastAddress());
+ // remove remote endpoints
+ TokenMetadata.Topology topology = tmd.cloneOnlyTokenMap().getTopology();
+ HashSet<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
+ expected = Sets.intersection(expected, localEndpoints);
+
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+ Set<InetAddress> neighbors = new HashSet<>();
+ for (Range<Token> range : ranges)
+ {
- neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
++ neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+ }
+ assertEquals(expected, neighbors);
+ }
+
+ @Test
+ public void testGetNeighborsTimesTwoInSpecifiedHosts() throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+ // generate rf*2 nodes, and ensure that only neighbors specified by the hosts are returned
+ addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy();
+ List<InetAddress> expected = new ArrayList<>();
+ for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
+ {
+ expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+ }
+
+ expected.remove(FBUtilities.getBroadcastAddress());
+ Collection<String> hosts = Arrays.asList(FBUtilities.getBroadcastAddress().getCanonicalHostName(),expected.get(0).getCanonicalHostName());
++ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+
- assertEquals(expected.get(0), ActiveRepairService.getNeighbors(KEYSPACE5,
- StorageService.instance.getLocalRanges(KEYSPACE5).iterator().next(),
- null, hosts).iterator().next());
++ assertEquals(expected.get(0), ActiveRepairService.getNeighbors(KEYSPACE5, ranges,
++ ranges.iterator().next(),
++ null, hosts).iterator().next());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetNeighborsSpecifiedHostsWithNoLocalHost() throws Throwable
+ {
+ addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ //Dont give local endpoint
+ Collection<String> hosts = Arrays.asList("127.0.0.3");
- ActiveRepairService.getNeighbors(KEYSPACE5, StorageService.instance.getLocalRanges(KEYSPACE5).iterator().next(), null, hosts);
++ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
++ ActiveRepairService.getNeighbors(KEYSPACE5, ranges, ranges.iterator().next(), null, hosts);
+ }
+
+ Set<InetAddress> addTokens(int max) throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+ Set<InetAddress> endpoints = new HashSet<>();
+ for (int i = 1; i <= max; i++)
+ {
+ InetAddress endpoint = InetAddress.getByName("127.0.0." + i);
+ tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(), endpoint);
+ endpoints.add(endpoint);
+ }
+ return endpoints;
+ }
@Test
public void testGetActiveRepairedSSTableRefs()
[08/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by sn...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/70ee4ed4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/70ee4ed4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/70ee4ed4
Branch: refs/heads/cassandra-3.0
Commit: 70ee4ed49a01a6466f8156c69f383ebc1d3b0e29
Parents: 3c531cb c2566d1
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Jun 15 11:46:25 2016 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Jun 15 11:46:25 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/repair/RepairRunnable.java | 7 ++++++-
.../cassandra/service/ActiveRepairService.java | 7 +++++--
.../service/ActiveRepairServiceTest.java | 18 ++++++++++--------
4 files changed, 22 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70ee4ed4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2c42c94,d9afaa3..3eb420d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,30 -1,5 +1,31 @@@
-2.2.7
+3.0.8
+ * Add TimeWindowCompactionStrategy (CASSANDRA-9666)
+Merged from 2.2:
* StorageService shutdown hook should use a volatile variable (CASSANDRA-11984)
+Merged from 2.1:
++ * Cache local ranges when calculating repair neighbors (CASSANDRA-11934)
+ * Allow LWT operation on static column with only partition keys (CASSANDRA-10532)
+ * Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886)
+ * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749)
+
+
+3.0.7
+ * Fix legacy serialization of Thrift-generated non-compound range tombstones
+ when communicating with 2.x nodes (CASSANDRA-11930)
+ * Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849)
+ * Avoid referencing DatabaseDescriptor in AbstractType (CASSANDRA-11912)
+ * Fix sstables not being protected from removal during index build (CASSANDRA-11905)
+ * cqlsh: Suppress stack trace from Read/WriteFailures (CASSANDRA-11032)
+ * Remove unneeded code to repair index summaries that have
+ been improperly down-sampled (CASSANDRA-11127)
+ * Avoid WriteTimeoutExceptions during commit log replay due to materialized
+ view lock contention (CASSANDRA-11891)
+ * Prevent OOM failures on SSTable corruption, improve tests for corruption detection (CASSANDRA-9530)
+ * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705)
+ * Allow compaction strategies to disable early open (CASSANDRA-11754)
+ * Refactor Materialized View code (CASSANDRA-11475)
+ * Update Java Driver (CASSANDRA-11615)
+Merged from 2.2:
* Persist local metadata earlier in startup sequence (CASSANDRA-11742)
* Run CommitLog tests with different compression settings (CASSANDRA-9039)
* cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70ee4ed4/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java
index 741cada,f92310b..21d0cd6
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@@ -147,19 -146,22 +147,24 @@@ public class RepairRunnable extends Wra
}
final Set<InetAddress> allNeighbors = new HashSet<>();
- Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
+ List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges = new ArrayList<>();
+
+ //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent
+ //calculation multiple times
+ Collection<Range<Token>> keyspaceLocalRanges = storageService.getLocalRanges(keyspace);
+
try
{
for (Range<Token> range : options.getRanges())
{
- Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range,
- Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges,
- range, options.getDataCenters(),
- options.getHosts());
- rangeToNeighbors.put(range, neighbors);
- allNeighbors.addAll(neighbors);
++ Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range,
+ options.getDataCenters(),
+ options.getHosts());
+
+ addRangeToNeighbors(commonRanges, range, neighbors);
+ allNeighbors.addAll(neighbors);
}
+
progress.incrementAndGet();
}
catch (IllegalArgumentException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70ee4ed4/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 9f249e4,0bb7172..6b1fd83
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -182,7 -183,7 +183,9 @@@ public class ActiveRepairService implem
*
* @return neighbors with whom we share the provided range
*/
- public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts)
- public static Set<InetAddress> getNeighbors(String keyspaceName, Collection<Range<Token>> keyspaceLocalRanges, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts)
++ public static Set<InetAddress> getNeighbors(String keyspaceName, Collection<Range<Token>> keyspaceLocalRanges,
++ Range<Token> toRepair, Collection<String> dataCenters,
++ Collection<String> hosts)
{
StorageService ss = StorageService.instance;
Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(keyspaceName);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70ee4ed4/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
[05/10] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by sn...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c2566d1c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c2566d1c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c2566d1c
Branch: refs/heads/cassandra-3.0
Commit: c2566d1cf9e239063a530ec08e8e098feffe387b
Parents: 52a827b 2f74831
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Jun 15 11:44:57 2016 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Jun 15 11:44:57 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/repair/RepairRunnable.java | 9 +++++++--
.../cassandra/service/ActiveRepairService.java | 5 +++--
.../service/ActiveRepairServiceTest.java | 18 ++++++++++--------
4 files changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2566d1c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d0ca37f,ec2b48e..d9afaa3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,30 -1,5 +1,31 @@@
-2.1.15
+2.2.7
+ * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984)
+ * Persist local metadata earlier in startup sequence (CASSANDRA-11742)
+ * Run CommitLog tests with different compression settings (CASSANDRA-9039)
+ * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
+ * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
+ * Fix possible race condition in CommitLog.recover (CASSANDRA-11743)
+ * Enable client encryption in sstableloader with cli options (CASSANDRA-11708)
+ * Possible memory leak in NIODataInputStream (CASSANDRA-11867)
+ * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
+ * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
+ * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
+ * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
+ * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
+ * Produce a heap dump when exiting on OOM (CASSANDRA-9861)
+ * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427)
+ * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510)
+ * JSON datetime formatting needs timezone (CASSANDRA-11137)
+ * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502)
+ * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660)
+ * Add missing files to debian packages (CASSANDRA-11642)
+ * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621)
+ * cqlsh: COPY FROM should use regular inserts for single statement batches and
+ report errors correctly if workers processes crash on initialization (CASSANDRA-11474)
+ * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553)
+ * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988)
+Merged from 2.1:
+ * Cache local ranges when calculating repair neighbors (CASSANDRA-11933)
* Allow LWT operation on static column with only partition keys (CASSANDRA-10532)
* Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886)
* cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2566d1c/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java
index b849cf8,0000000..f92310b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@@ -1,409 -1,0 +1,414 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.*;
+import org.apache.commons.lang3.time.DurationFormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tracing.TraceKeyspace;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.utils.progress.ProgressEvent;
+import org.apache.cassandra.utils.progress.ProgressEventNotifier;
+import org.apache.cassandra.utils.progress.ProgressEventType;
+import org.apache.cassandra.utils.progress.ProgressListener;
+
+public class RepairRunnable extends WrappedRunnable implements ProgressEventNotifier
+{
+ private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class);
+
+ private StorageService storageService;
+ private final int cmd;
+ private final RepairOption options;
+ private final String keyspace;
+
+ private final List<ProgressListener> listeners = new ArrayList<>();
+
+ public RepairRunnable(StorageService storageService, int cmd, RepairOption options, String keyspace)
+ {
+ this.storageService = storageService;
+ this.cmd = cmd;
+ this.options = options;
+ this.keyspace = keyspace;
+ }
+
+ @Override
+ public void addProgressListener(ProgressListener listener)
+ {
+ listeners.add(listener);
+ }
+
+ @Override
+ public void removeProgressListener(ProgressListener listener)
+ {
+ listeners.remove(listener);
+ }
+
+ protected void fireProgressEvent(String tag, ProgressEvent event)
+ {
+ for (ProgressListener listener : listeners)
+ {
+ listener.progress(tag, event);
+ }
+ }
+
+ protected void fireErrorAndComplete(String tag, int progressCount, int totalProgress, String message)
+ {
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progressCount, totalProgress, message));
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress));
+ }
+
+ protected void runMayThrow() throws Exception
+ {
+ final TraceState traceState;
+
+ final String tag = "repair:" + cmd;
+
+ final AtomicInteger progress = new AtomicInteger();
+ final int totalProgress = 3 + options.getRanges().size(); // calculate neighbors, validation, prepare for repair + number of ranges to repair
+
+ String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]);
+ Iterable<ColumnFamilyStore> validColumnFamilies = storageService.getValidColumnFamilies(false, false, keyspace,
+ columnFamilies);
+
+ final long startTime = System.currentTimeMillis();
+ String message = String.format("Starting repair command #%d, repairing keyspace %s with %s", cmd, keyspace,
+ options);
+ logger.info(message);
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.START, 0, 100, message));
+ if (options.isTraced())
+ {
+ StringBuilder cfsb = new StringBuilder();
+ for (ColumnFamilyStore cfs : validColumnFamilies)
+ cfsb.append(", ").append(cfs.keyspace.getName()).append(".").append(cfs.name);
+
+ UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR);
+ traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace, "columnFamilies",
+ cfsb.substring(2)));
+ Tracing.traceRepair(message);
+ traceState.enableActivityNotification(tag);
+ for (ProgressListener listener : listeners)
+ traceState.addProgressListener(listener);
+ Thread queryThread = createQueryThread(cmd, sessionId);
+ queryThread.setName("RepairTracePolling");
+ queryThread.start();
+ }
+ else
+ {
+ traceState = null;
+ }
+
+ final Set<InetAddress> allNeighbors = new HashSet<>();
+ Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
++
++ //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent
++ //calculation multiple times
++ Collection<Range<Token>> keyspaceLocalRanges = storageService.getLocalRanges(keyspace);
++
+ try
+ {
+ for (Range<Token> range : options.getRanges())
+ {
- Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range,
- options.getDataCenters(),
++ Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges,
++ range, options.getDataCenters(),
+ options.getHosts());
+ rangeToNeighbors.put(range, neighbors);
+ allNeighbors.addAll(neighbors);
+ }
+ progress.incrementAndGet();
+ }
+ catch (IllegalArgumentException e)
+ {
+ logger.error("Repair failed:", e);
+ fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage());
+ return;
+ }
+
+ // Validate columnfamilies
+ List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>();
+ try
+ {
+ Iterables.addAll(columnFamilyStores, validColumnFamilies);
+ progress.incrementAndGet();
+ }
+ catch (IllegalArgumentException e)
+ {
+ fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage());
+ return;
+ }
+
+ String[] cfnames = new String[columnFamilyStores.size()];
+ for (int i = 0; i < columnFamilyStores.size(); i++)
+ {
+ cfnames[i] = columnFamilyStores.get(i).name;
+ }
+
+ final UUID parentSession = UUIDGen.getTimeUUID();
+ SystemDistributedKeyspace.startParentRepair(parentSession, keyspace, cfnames, options.getRanges());
+ long repairedAt;
+ try
+ {
+ ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddress(), allNeighbors, options, columnFamilyStores);
+ repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).getRepairedAt();
+ progress.incrementAndGet();
+ }
+ catch (Throwable t)
+ {
+ SystemDistributedKeyspace.failParentRepair(parentSession, t);
+ fireErrorAndComplete(tag, progress.get(), totalProgress, t.getMessage());
+ return;
+ }
+
+ // Set up RepairJob executor for this repair command.
+ final ListeningExecutorService executor = MoreExecutors.listeningDecorator(new JMXConfigurableThreadPoolExecutor(options.getJobThreads(),
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory("Repair#" + cmd),
+ "internal"));
+
+ List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
+ for (Range<Token> range : options.getRanges())
+ {
+ final RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
+ range,
+ keyspace,
+ options.getParallelism(),
+ rangeToNeighbors.get(range),
+ repairedAt,
+ executor,
+ cfnames);
+ if (session == null)
+ continue;
+ // After repair session completes, notify client its result
+ Futures.addCallback(session, new FutureCallback<RepairSessionResult>()
+ {
+ public void onSuccess(RepairSessionResult result)
+ {
+ /**
+ * If the success message below is modified, it must also be updated on
+ * {@link org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport}
+ * for backward-compatibility support.
+ */
+ String message = String.format("Repair session %s for range %s finished", session.getId(),
+ session.getRange().toString());
+ logger.info(message);
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS,
+ progress.incrementAndGet(),
+ totalProgress,
+ message));
+ }
+
+ public void onFailure(Throwable t)
+ {
+ /**
+ * If the failure message below is modified, it must also be updated on
+ * {@link org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport}
+ * for backward-compatibility support.
+ */
+ String message = String.format("Repair session %s for range %s failed with error %s",
+ session.getId(), session.getRange().toString(), t.getMessage());
+ logger.error(message, t);
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS,
+ progress.incrementAndGet(),
+ totalProgress,
+ message));
+ }
+ });
+ futures.add(session);
+ }
+
+ // After all repair sessions completes(successful or not),
+ // run anticompaction if necessary and send finish notice back to client
+ final Collection<Range<Token>> successfulRanges = new ArrayList<>();
+ final AtomicBoolean hasFailure = new AtomicBoolean();
+ final ListenableFuture<List<RepairSessionResult>> allSessions = Futures.successfulAsList(futures);
+ ListenableFuture anticompactionResult = Futures.transform(allSessions, new AsyncFunction<List<RepairSessionResult>, Object>()
+ {
+ @SuppressWarnings("unchecked")
+ public ListenableFuture apply(List<RepairSessionResult> results) throws Exception
+ {
+ // filter out null(=failed) results and get successful ranges
+ for (RepairSessionResult sessionResult : results)
+ {
+ if (sessionResult != null)
+ {
+ successfulRanges.add(sessionResult.range);
+ }
+ else
+ {
+ hasFailure.compareAndSet(false, true);
+ }
+ }
+ return ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges);
+ }
+ });
+ Futures.addCallback(anticompactionResult, new FutureCallback<Object>()
+ {
+ public void onSuccess(Object result)
+ {
+ SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges);
+ if (hasFailure.get())
+ {
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress,
+ "Some repair failed"));
+ }
+ else
+ {
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress,
+ "Repair completed successfully"));
+ }
+ repairComplete();
+ }
+
+ public void onFailure(Throwable t)
+ {
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage()));
+ SystemDistributedKeyspace.failParentRepair(parentSession, t);
+ repairComplete();
+ }
+
+ private void repairComplete()
+ {
+ String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime,
+ true, true);
+ String message = String.format("Repair command #%d finished in %s", cmd, duration);
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message));
+ logger.info(message);
+ if (options.isTraced() && traceState != null)
+ {
+ for (ProgressListener listener : listeners)
+ traceState.removeProgressListener(listener);
+ // Because DebuggableThreadPoolExecutor#afterExecute and this callback
+ // run in a nondeterministic order (within the same thread), the
+ // TraceState may have been nulled out at this point. The TraceState
+ // should be traceState, so just set it without bothering to check if it
+ // actually was nulled out.
+ Tracing.instance.set(traceState);
+ Tracing.traceRepair(message);
+ Tracing.instance.stopSession();
+ }
+ executor.shutdownNow();
+ }
+ });
+ }
+
+ private Thread createQueryThread(final int cmd, final UUID sessionId)
+ {
+ return new Thread(new WrappedRunnable()
+ {
+ // Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces.
+ // Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts.
+ public void runMayThrow() throws Exception
+ {
+ TraceState state = Tracing.instance.get(sessionId);
+ if (state == null)
+ throw new Exception("no tracestate");
+
+ String format = "select event_id, source, activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;";
+ String query = String.format(format, TraceKeyspace.NAME, TraceKeyspace.EVENTS);
+ SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement;
+
+ ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId);
+ InetAddress source = FBUtilities.getBroadcastAddress();
+
+ HashSet<UUID>[] seen = new HashSet[] { new HashSet<>(), new HashSet<>() };
+ int si = 0;
+ UUID uuid;
+
+ long tlast = System.currentTimeMillis(), tcur;
+
+ TraceState.Status status;
+ long minWaitMillis = 125;
+ long maxWaitMillis = 1000 * 1024L;
+ long timeout = minWaitMillis;
+ boolean shouldDouble = false;
+
+ while ((status = state.waitActivity(timeout)) != TraceState.Status.STOPPED)
+ {
+ if (status == TraceState.Status.IDLE)
+ {
+ timeout = shouldDouble ? Math.min(timeout * 2, maxWaitMillis) : timeout;
+ shouldDouble = !shouldDouble;
+ }
+ else
+ {
+ timeout = minWaitMillis;
+ shouldDouble = false;
+ }
+ ByteBuffer tminBytes = ByteBufferUtil.bytes(UUIDGen.minTimeUUID(tlast - 1000));
+ ByteBuffer tmaxBytes = ByteBufferUtil.bytes(UUIDGen.maxTimeUUID(tcur = System.currentTimeMillis()));
+ QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(sessionIdBytes,
+ tminBytes,
+ tmaxBytes));
+ ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options);
+ UntypedResultSet result = UntypedResultSet.create(rows.result);
+
+ for (UntypedResultSet.Row r : result)
+ {
+ if (source.equals(r.getInetAddress("source")))
+ continue;
+ if ((uuid = r.getUUID("event_id")).timestamp() > (tcur - 1000) * 10000)
+ seen[si].add(uuid);
+ if (seen[si == 0 ? 1 : 0].contains(uuid))
+ continue;
+ String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity"));
+ fireProgressEvent("repair:" + cmd,
+ new ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message));
+ }
+ tlast = tcur;
+
+ si = si == 0 ? 1 : 0;
+ seen[si].clear();
+ }
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2566d1c/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2566d1c/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index 7793660,26e5126..f34d0e2
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@@ -50,180 -43,11 +50,182 @@@ import org.apache.cassandra.utils.concu
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-public class ActiveRepairServiceTest extends SchemaLoader
+public class ActiveRepairServiceTest
{
+ public static final String KEYSPACE5 = "Keyspace5";
+ public static final String CF_STANDARD1 = "Standard1";
+ public static final String CF_COUNTER = "Counter1";
- private static final String KEYSPACE1 = "Keyspace1";
- private static final String CF = "Standard1";
+ public String cfname;
+ public ColumnFamilyStore store;
+ public InetAddress LOCAL, REMOTE;
+
+ private boolean initialized;
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE5,
+ SimpleStrategy.class,
+ KSMetaData.optsWithRF(2),
+ SchemaLoader.standardCFMD(KEYSPACE5, CF_COUNTER),
+ SchemaLoader.standardCFMD(KEYSPACE5, CF_STANDARD1));
+ }
+
+ @Before
+ public void prepare() throws Exception
+ {
+ if (!initialized)
+ {
+ SchemaLoader.startGossiper();
+ initialized = true;
+
+ LOCAL = FBUtilities.getBroadcastAddress();
+ // generate a fake endpoint for which we can spoof receiving/sending trees
+ REMOTE = InetAddress.getByName("127.0.0.2");
+ }
+
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+ tmd.clearUnsafe();
+ StorageService.instance.setTokens(Collections.singleton(StorageService.getPartitioner().getRandomToken()));
+ tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(), REMOTE);
+ assert tmd.isMember(REMOTE);
+ }
+
+ @Test
+ public void testGetNeighborsPlusOne() throws Throwable
+ {
+ // generate rf+1 nodes, and ensure that all nodes are returned
+ Set<InetAddress> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ expected.remove(FBUtilities.getBroadcastAddress());
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+ Set<InetAddress> neighbors = new HashSet<>();
+ for (Range<Token> range : ranges)
+ {
- neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, null, null));
++ neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, null, null));
+ }
+ assertEquals(expected, neighbors);
+ }
+
+ @Test
+ public void testGetNeighborsTimesTwo() throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+ // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned
+ addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy();
+ Set<InetAddress> expected = new HashSet<>();
+ for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
+ {
+ expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+ }
+ expected.remove(FBUtilities.getBroadcastAddress());
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+ Set<InetAddress> neighbors = new HashSet<>();
+ for (Range<Token> range : ranges)
+ {
- neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, null, null));
++ neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, null, null));
+ }
+ assertEquals(expected, neighbors);
+ }
+
+ @Test
+ public void testGetNeighborsPlusOneInLocalDC() throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+ // generate rf+1 nodes, and ensure that all nodes are returned
+ Set<InetAddress> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ expected.remove(FBUtilities.getBroadcastAddress());
+ // remove remote endpoints
+ TokenMetadata.Topology topology = tmd.cloneOnlyTokenMap().getTopology();
+ HashSet<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
+ expected = Sets.intersection(expected, localEndpoints);
+
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+ Set<InetAddress> neighbors = new HashSet<>();
+ for (Range<Token> range : ranges)
+ {
- neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
++ neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+ }
+ assertEquals(expected, neighbors);
+ }
+
+ @Test
+ public void testGetNeighborsTimesTwoInLocalDC() throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+ // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned
+ addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy();
+ Set<InetAddress> expected = new HashSet<>();
+ for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
+ {
+ expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+ }
+ expected.remove(FBUtilities.getBroadcastAddress());
+ // remove remote endpoints
+ TokenMetadata.Topology topology = tmd.cloneOnlyTokenMap().getTopology();
+ HashSet<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
+ expected = Sets.intersection(expected, localEndpoints);
+
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+ Set<InetAddress> neighbors = new HashSet<>();
+ for (Range<Token> range : ranges)
+ {
- neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
++ neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+ }
+ assertEquals(expected, neighbors);
+ }
+
+ @Test
+ public void testGetNeighborsTimesTwoInSpecifiedHosts() throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+ // generate rf*2 nodes, and ensure that only neighbors specified by the hosts are returned
+ addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy();
+ List<InetAddress> expected = new ArrayList<>();
+ for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
+ {
+ expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+ }
+
+ expected.remove(FBUtilities.getBroadcastAddress());
+ Collection<String> hosts = Arrays.asList(FBUtilities.getBroadcastAddress().getCanonicalHostName(),expected.get(0).getCanonicalHostName());
++ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+
- assertEquals(expected.get(0), ActiveRepairService.getNeighbors(KEYSPACE5,
- StorageService.instance.getLocalRanges(KEYSPACE5).iterator().next(),
- null, hosts).iterator().next());
++ assertEquals(expected.get(0), ActiveRepairService.getNeighbors(KEYSPACE5, ranges,
++ ranges.iterator().next(),
++ null, hosts).iterator().next());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetNeighborsSpecifiedHostsWithNoLocalHost() throws Throwable
+ {
+ addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ //Dont give local endpoint
+ Collection<String> hosts = Arrays.asList("127.0.0.3");
- ActiveRepairService.getNeighbors(KEYSPACE5, StorageService.instance.getLocalRanges(KEYSPACE5).iterator().next(), null, hosts);
++ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
++ ActiveRepairService.getNeighbors(KEYSPACE5, ranges, ranges.iterator().next(), null, hosts);
+ }
+
+ Set<InetAddress> addTokens(int max) throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+ Set<InetAddress> endpoints = new HashSet<>();
+ for (int i = 1; i <= max; i++)
+ {
+ InetAddress endpoint = InetAddress.getByName("127.0.0." + i);
+ tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(), endpoint);
+ endpoints.add(endpoint);
+ }
+ return endpoints;
+ }
@Test
public void testGetActiveRepairedSSTableRefs()
[02/10] cassandra git commit: Cache local ranges when calculating
repair neighbors
Posted by sn...@apache.org.
Cache local ranges when calculating repair neighbors
patch by Mahdi Mohammadi; reviewed by Paulo Motta for CASSANDRA-11933
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f74831f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f74831f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f74831f
Branch: refs/heads/cassandra-2.2
Commit: 2f74831f4218142f6e118678a3c74c79c1f7e1ed
Parents: 1d2d074
Author: Mahdi Mohammadi <ma...@gmail.com>
Authored: Wed Jun 15 11:43:27 2016 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Jun 15 11:43:27 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/service/ActiveRepairService.java | 8 +++++---
.../org/apache/cassandra/service/StorageService.java | 6 +++++-
.../service/AntiEntropyServiceTestAbstract.java | 14 ++++++++------
4 files changed, 19 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7d70902..ec2b48e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.15
+ * Cache local ranges when calculating repair neighbors (CASSANDRA-11933)
* Allow LWT operation on static column with only partition keys (CASSANDRA-10532)
* Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886)
* cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index f8975f9..4c83c48 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -164,7 +164,8 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
RepairFuture submitArtificialRepairSession(RepairJobDesc desc)
{
Set<InetAddress> neighbours = new HashSet<>();
- neighbours.addAll(ActiveRepairService.getNeighbors(desc.keyspace, desc.range, null, null));
+ Collection<Range<Token>> keyspaceLocalRanges = StorageService.instance.getLocalRanges(desc.keyspace);
+ neighbours.addAll(ActiveRepairService.getNeighbors(desc.keyspace, keyspaceLocalRanges, desc.range, null, null));
RepairSession session = new RepairSession(desc.parentSessionId, desc.sessionId, desc.range, desc.keyspace, RepairParallelism.PARALLEL, neighbours, new String[]{desc.columnFamily});
sessions.put(session.getId(), session);
RepairFuture futureTask = new RepairFuture(session);
@@ -176,17 +177,18 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
* Return all of the neighbors with whom we share the provided range.
*
* @param keyspaceName keyspace to repair
+ * @param keyspaceLocalRanges local-range for given keyspaceName
* @param toRepair token to repair
* @param dataCenters the data centers to involve in the repair
*
* @return neighbors with whom we share the provided range
*/
- public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts)
+ public static Set<InetAddress> getNeighbors(String keyspaceName, Collection<Range<Token>> keyspaceLocalRanges, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts)
{
StorageService ss = StorageService.instance;
Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(keyspaceName);
Range<Token> rangeSuperSet = null;
- for (Range<Token> range : ss.getLocalRanges(keyspaceName))
+ for (Range<Token> range : keyspaceLocalRanges)
{
if (range.contains(toRepair))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index eea4556..27939f9 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2978,13 +2978,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return;
}
+ //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent
+ //calculation multiple times
+ Collection<Range<Token>> keyspaceLocalRanges = getLocalRanges(keyspace);
+
Set<InetAddress> allNeighbors = new HashSet<>();
Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
for (Range<Token> range : ranges)
{
try
{
- Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range, dataCenters, hosts);
+ Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range, dataCenters, hosts);
rangeToNeighbors.put(range, neighbors);
allNeighbors.addAll(neighbors);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index ac39de6..21eb492 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -123,7 +123,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null, null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, null, null));
}
assertEquals(expected, neighbors);
}
@@ -146,7 +146,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null, null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, null, null));
}
assertEquals(expected, neighbors);
}
@@ -168,7 +168,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
}
assertEquals(expected, neighbors);
}
@@ -196,7 +196,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
}
assertEquals(expected, neighbors);
}
@@ -218,7 +218,8 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
expected.remove(FBUtilities.getBroadcastAddress());
Collection<String> hosts = Arrays.asList(FBUtilities.getBroadcastAddress().getCanonicalHostName(),expected.get(0).getCanonicalHostName());
- assertEquals(expected.get(0), ActiveRepairService.getNeighbors(keyspaceName, StorageService.instance.getLocalRanges(keyspaceName).iterator().next(), null, hosts).iterator().next());
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspaceName);
+ assertEquals(expected.get(0), ActiveRepairService.getNeighbors(keyspaceName, ranges, ranges.iterator().next(), null, hosts).iterator().next());
}
@Test(expected = IllegalArgumentException.class)
@@ -227,7 +228,8 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
addTokens(2 * Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor());
//Dont give local endpoint
Collection<String> hosts = Arrays.asList("127.0.0.3");
- ActiveRepairService.getNeighbors(keyspaceName, StorageService.instance.getLocalRanges(keyspaceName).iterator().next(), null, hosts);
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspaceName);
+ ActiveRepairService.getNeighbors(keyspaceName, ranges, ranges.iterator().next(), null, hosts);
}
Set<InetAddress> addTokens(int max) throws Throwable
[10/10] cassandra git commit: Merge branch 'cassandra-3.0' into trunk
Posted by sn...@apache.org.
Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4210a25a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4210a25a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4210a25a
Branch: refs/heads/trunk
Commit: 4210a25a909c629c19db5d91de671c2a0f9fa28b
Parents: 4ed0060 70ee4ed
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Jun 15 11:47:37 2016 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Jun 15 11:47:37 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../apache/cassandra/repair/RepairRunnable.java | 7 ++++++-
.../cassandra/service/ActiveRepairService.java | 7 +++++--
.../service/ActiveRepairServiceTest.java | 18 ++++++++++--------
4 files changed, 22 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4210a25a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index dbcfa34,3eb420d..c9fa673
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -20,11 -8,8 +21,10 @@@ Merged from 2.1
* Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886)
* cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749)
--
-3.0.7
+3.7
+ * Support multiple folders for user defined compaction tasks (CASSANDRA-11765)
+ * Fix race in CompactionStrategyManager's pause/resume (CASSANDRA-11922)
+Merged from 3.0:
* Fix legacy serialization of Thrift-generated non-compound range tombstones
when communicating with 2.x nodes (CASSANDRA-11930)
* Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4210a25a/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4210a25a/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
[04/10] cassandra git commit: Cache local ranges when calculating
repair neighbors
Posted by sn...@apache.org.
Cache local ranges when calculating repair neighbors
patch by Mahdi Mohammadi; reviewed by Paulo Motta for CASSANDRA-11933
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f74831f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f74831f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f74831f
Branch: refs/heads/trunk
Commit: 2f74831f4218142f6e118678a3c74c79c1f7e1ed
Parents: 1d2d074
Author: Mahdi Mohammadi <ma...@gmail.com>
Authored: Wed Jun 15 11:43:27 2016 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Jun 15 11:43:27 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/service/ActiveRepairService.java | 8 +++++---
.../org/apache/cassandra/service/StorageService.java | 6 +++++-
.../service/AntiEntropyServiceTestAbstract.java | 14 ++++++++------
4 files changed, 19 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7d70902..ec2b48e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.15
+ * Cache local ranges when calculating repair neighbors (CASSANDRA-11933)
* Allow LWT operation on static column with only partition keys (CASSANDRA-10532)
* Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886)
* cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index f8975f9..4c83c48 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -164,7 +164,8 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
RepairFuture submitArtificialRepairSession(RepairJobDesc desc)
{
Set<InetAddress> neighbours = new HashSet<>();
- neighbours.addAll(ActiveRepairService.getNeighbors(desc.keyspace, desc.range, null, null));
+ Collection<Range<Token>> keyspaceLocalRanges = StorageService.instance.getLocalRanges(desc.keyspace);
+ neighbours.addAll(ActiveRepairService.getNeighbors(desc.keyspace, keyspaceLocalRanges, desc.range, null, null));
RepairSession session = new RepairSession(desc.parentSessionId, desc.sessionId, desc.range, desc.keyspace, RepairParallelism.PARALLEL, neighbours, new String[]{desc.columnFamily});
sessions.put(session.getId(), session);
RepairFuture futureTask = new RepairFuture(session);
@@ -176,17 +177,18 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
* Return all of the neighbors with whom we share the provided range.
*
* @param keyspaceName keyspace to repair
+ * @param keyspaceLocalRanges local-range for given keyspaceName
* @param toRepair token to repair
* @param dataCenters the data centers to involve in the repair
*
* @return neighbors with whom we share the provided range
*/
- public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts)
+ public static Set<InetAddress> getNeighbors(String keyspaceName, Collection<Range<Token>> keyspaceLocalRanges, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts)
{
StorageService ss = StorageService.instance;
Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(keyspaceName);
Range<Token> rangeSuperSet = null;
- for (Range<Token> range : ss.getLocalRanges(keyspaceName))
+ for (Range<Token> range : keyspaceLocalRanges)
{
if (range.contains(toRepair))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index eea4556..27939f9 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2978,13 +2978,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return;
}
+ //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent
+ //calculation multiple times
+ Collection<Range<Token>> keyspaceLocalRanges = getLocalRanges(keyspace);
+
Set<InetAddress> allNeighbors = new HashSet<>();
Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
for (Range<Token> range : ranges)
{
try
{
- Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range, dataCenters, hosts);
+ Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range, dataCenters, hosts);
rangeToNeighbors.put(range, neighbors);
allNeighbors.addAll(neighbors);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index ac39de6..21eb492 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -123,7 +123,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null, null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, null, null));
}
assertEquals(expected, neighbors);
}
@@ -146,7 +146,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null, null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, null, null));
}
assertEquals(expected, neighbors);
}
@@ -168,7 +168,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
}
assertEquals(expected, neighbors);
}
@@ -196,7 +196,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
}
assertEquals(expected, neighbors);
}
@@ -218,7 +218,8 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
expected.remove(FBUtilities.getBroadcastAddress());
Collection<String> hosts = Arrays.asList(FBUtilities.getBroadcastAddress().getCanonicalHostName(),expected.get(0).getCanonicalHostName());
- assertEquals(expected.get(0), ActiveRepairService.getNeighbors(keyspaceName, StorageService.instance.getLocalRanges(keyspaceName).iterator().next(), null, hosts).iterator().next());
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspaceName);
+ assertEquals(expected.get(0), ActiveRepairService.getNeighbors(keyspaceName, ranges, ranges.iterator().next(), null, hosts).iterator().next());
}
@Test(expected = IllegalArgumentException.class)
@@ -227,7 +228,8 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
addTokens(2 * Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor());
//Dont give local endpoint
Collection<String> hosts = Arrays.asList("127.0.0.3");
- ActiveRepairService.getNeighbors(keyspaceName, StorageService.instance.getLocalRanges(keyspaceName).iterator().next(), null, hosts);
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspaceName);
+ ActiveRepairService.getNeighbors(keyspaceName, ranges, ranges.iterator().next(), null, hosts);
}
Set<InetAddress> addTokens(int max) throws Throwable
[06/10] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by sn...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c2566d1c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c2566d1c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c2566d1c
Branch: refs/heads/cassandra-2.2
Commit: c2566d1cf9e239063a530ec08e8e098feffe387b
Parents: 52a827b 2f74831
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Jun 15 11:44:57 2016 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Jun 15 11:44:57 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/repair/RepairRunnable.java | 9 +++++++--
.../cassandra/service/ActiveRepairService.java | 5 +++--
.../service/ActiveRepairServiceTest.java | 18 ++++++++++--------
4 files changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2566d1c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d0ca37f,ec2b48e..d9afaa3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,30 -1,5 +1,31 @@@
-2.1.15
+2.2.7
+ * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984)
+ * Persist local metadata earlier in startup sequence (CASSANDRA-11742)
+ * Run CommitLog tests with different compression settings (CASSANDRA-9039)
+ * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
+ * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
+ * Fix possible race condition in CommitLog.recover (CASSANDRA-11743)
+ * Enable client encryption in sstableloader with cli options (CASSANDRA-11708)
+ * Possible memory leak in NIODataInputStream (CASSANDRA-11867)
+ * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
+ * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
+ * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
+ * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
+ * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
+ * Produce a heap dump when exiting on OOM (CASSANDRA-9861)
+ * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427)
+ * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510)
+ * JSON datetime formatting needs timezone (CASSANDRA-11137)
+ * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502)
+ * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660)
+ * Add missing files to debian packages (CASSANDRA-11642)
+ * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621)
+ * cqlsh: COPY FROM should use regular inserts for single statement batches and
+ report errors correctly if workers processes crash on initialization (CASSANDRA-11474)
+ * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553)
+ * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988)
+Merged from 2.1:
+ * Cache local ranges when calculating repair neighbors (CASSANDRA-11933)
* Allow LWT operation on static column with only partition keys (CASSANDRA-10532)
* Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886)
* cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2566d1c/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java
index b849cf8,0000000..f92310b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@@ -1,409 -1,0 +1,414 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.*;
+import org.apache.commons.lang3.time.DurationFormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tracing.TraceKeyspace;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.utils.progress.ProgressEvent;
+import org.apache.cassandra.utils.progress.ProgressEventNotifier;
+import org.apache.cassandra.utils.progress.ProgressEventType;
+import org.apache.cassandra.utils.progress.ProgressListener;
+
+public class RepairRunnable extends WrappedRunnable implements ProgressEventNotifier
+{
+ private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class);
+
+ private StorageService storageService;
+ private final int cmd;
+ private final RepairOption options;
+ private final String keyspace;
+
+ private final List<ProgressListener> listeners = new ArrayList<>();
+
+ public RepairRunnable(StorageService storageService, int cmd, RepairOption options, String keyspace)
+ {
+ this.storageService = storageService;
+ this.cmd = cmd;
+ this.options = options;
+ this.keyspace = keyspace;
+ }
+
+ @Override
+ public void addProgressListener(ProgressListener listener)
+ {
+ listeners.add(listener);
+ }
+
+ @Override
+ public void removeProgressListener(ProgressListener listener)
+ {
+ listeners.remove(listener);
+ }
+
+ protected void fireProgressEvent(String tag, ProgressEvent event)
+ {
+ for (ProgressListener listener : listeners)
+ {
+ listener.progress(tag, event);
+ }
+ }
+
+ protected void fireErrorAndComplete(String tag, int progressCount, int totalProgress, String message)
+ {
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progressCount, totalProgress, message));
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress));
+ }
+
+ protected void runMayThrow() throws Exception
+ {
+ final TraceState traceState;
+
+ final String tag = "repair:" + cmd;
+
+ final AtomicInteger progress = new AtomicInteger();
+ final int totalProgress = 3 + options.getRanges().size(); // calculate neighbors, validation, prepare for repair + number of ranges to repair
+
+ String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]);
+ Iterable<ColumnFamilyStore> validColumnFamilies = storageService.getValidColumnFamilies(false, false, keyspace,
+ columnFamilies);
+
+ final long startTime = System.currentTimeMillis();
+ String message = String.format("Starting repair command #%d, repairing keyspace %s with %s", cmd, keyspace,
+ options);
+ logger.info(message);
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.START, 0, 100, message));
+ if (options.isTraced())
+ {
+ StringBuilder cfsb = new StringBuilder();
+ for (ColumnFamilyStore cfs : validColumnFamilies)
+ cfsb.append(", ").append(cfs.keyspace.getName()).append(".").append(cfs.name);
+
+ UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR);
+ traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace, "columnFamilies",
+ cfsb.substring(2)));
+ Tracing.traceRepair(message);
+ traceState.enableActivityNotification(tag);
+ for (ProgressListener listener : listeners)
+ traceState.addProgressListener(listener);
+ Thread queryThread = createQueryThread(cmd, sessionId);
+ queryThread.setName("RepairTracePolling");
+ queryThread.start();
+ }
+ else
+ {
+ traceState = null;
+ }
+
+ final Set<InetAddress> allNeighbors = new HashSet<>();
+ Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
++
++ //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent
++ //calculation multiple times
++ Collection<Range<Token>> keyspaceLocalRanges = storageService.getLocalRanges(keyspace);
++
+ try
+ {
+ for (Range<Token> range : options.getRanges())
+ {
- Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range,
- options.getDataCenters(),
++ Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges,
++ range, options.getDataCenters(),
+ options.getHosts());
+ rangeToNeighbors.put(range, neighbors);
+ allNeighbors.addAll(neighbors);
+ }
+ progress.incrementAndGet();
+ }
+ catch (IllegalArgumentException e)
+ {
+ logger.error("Repair failed:", e);
+ fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage());
+ return;
+ }
+
+ // Validate columnfamilies
+ List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>();
+ try
+ {
+ Iterables.addAll(columnFamilyStores, validColumnFamilies);
+ progress.incrementAndGet();
+ }
+ catch (IllegalArgumentException e)
+ {
+ fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage());
+ return;
+ }
+
+ String[] cfnames = new String[columnFamilyStores.size()];
+ for (int i = 0; i < columnFamilyStores.size(); i++)
+ {
+ cfnames[i] = columnFamilyStores.get(i).name;
+ }
+
+ final UUID parentSession = UUIDGen.getTimeUUID();
+ SystemDistributedKeyspace.startParentRepair(parentSession, keyspace, cfnames, options.getRanges());
+ long repairedAt;
+ try
+ {
+ ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddress(), allNeighbors, options, columnFamilyStores);
+ repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).getRepairedAt();
+ progress.incrementAndGet();
+ }
+ catch (Throwable t)
+ {
+ SystemDistributedKeyspace.failParentRepair(parentSession, t);
+ fireErrorAndComplete(tag, progress.get(), totalProgress, t.getMessage());
+ return;
+ }
+
+ // Set up RepairJob executor for this repair command.
+ final ListeningExecutorService executor = MoreExecutors.listeningDecorator(new JMXConfigurableThreadPoolExecutor(options.getJobThreads(),
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory("Repair#" + cmd),
+ "internal"));
+
+ List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
+ for (Range<Token> range : options.getRanges())
+ {
+ final RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
+ range,
+ keyspace,
+ options.getParallelism(),
+ rangeToNeighbors.get(range),
+ repairedAt,
+ executor,
+ cfnames);
+ if (session == null)
+ continue;
+ // After repair session completes, notify client its result
+ Futures.addCallback(session, new FutureCallback<RepairSessionResult>()
+ {
+ public void onSuccess(RepairSessionResult result)
+ {
+ /**
+ * If the success message below is modified, it must also be updated on
+ * {@link org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport}
+ * for backward-compatibility support.
+ */
+ String message = String.format("Repair session %s for range %s finished", session.getId(),
+ session.getRange().toString());
+ logger.info(message);
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS,
+ progress.incrementAndGet(),
+ totalProgress,
+ message));
+ }
+
+ public void onFailure(Throwable t)
+ {
+ /**
+ * If the failure message below is modified, it must also be updated on
+ * {@link org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport}
+ * for backward-compatibility support.
+ */
+ String message = String.format("Repair session %s for range %s failed with error %s",
+ session.getId(), session.getRange().toString(), t.getMessage());
+ logger.error(message, t);
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS,
+ progress.incrementAndGet(),
+ totalProgress,
+ message));
+ }
+ });
+ futures.add(session);
+ }
+
+ // After all repair sessions completes(successful or not),
+ // run anticompaction if necessary and send finish notice back to client
+ final Collection<Range<Token>> successfulRanges = new ArrayList<>();
+ final AtomicBoolean hasFailure = new AtomicBoolean();
+ final ListenableFuture<List<RepairSessionResult>> allSessions = Futures.successfulAsList(futures);
+ ListenableFuture anticompactionResult = Futures.transform(allSessions, new AsyncFunction<List<RepairSessionResult>, Object>()
+ {
+ @SuppressWarnings("unchecked")
+ public ListenableFuture apply(List<RepairSessionResult> results) throws Exception
+ {
+ // filter out null(=failed) results and get successful ranges
+ for (RepairSessionResult sessionResult : results)
+ {
+ if (sessionResult != null)
+ {
+ successfulRanges.add(sessionResult.range);
+ }
+ else
+ {
+ hasFailure.compareAndSet(false, true);
+ }
+ }
+ return ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges);
+ }
+ });
+ Futures.addCallback(anticompactionResult, new FutureCallback<Object>()
+ {
+ public void onSuccess(Object result)
+ {
+ SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges);
+ if (hasFailure.get())
+ {
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress,
+ "Some repair failed"));
+ }
+ else
+ {
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress,
+ "Repair completed successfully"));
+ }
+ repairComplete();
+ }
+
+ public void onFailure(Throwable t)
+ {
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage()));
+ SystemDistributedKeyspace.failParentRepair(parentSession, t);
+ repairComplete();
+ }
+
+ private void repairComplete()
+ {
+ String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime,
+ true, true);
+ String message = String.format("Repair command #%d finished in %s", cmd, duration);
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message));
+ logger.info(message);
+ if (options.isTraced() && traceState != null)
+ {
+ for (ProgressListener listener : listeners)
+ traceState.removeProgressListener(listener);
+ // Because DebuggableThreadPoolExecutor#afterExecute and this callback
+ // run in a nondeterministic order (within the same thread), the
+ // TraceState may have been nulled out at this point. The TraceState
+ // should be traceState, so just set it without bothering to check if it
+ // actually was nulled out.
+ Tracing.instance.set(traceState);
+ Tracing.traceRepair(message);
+ Tracing.instance.stopSession();
+ }
+ executor.shutdownNow();
+ }
+ });
+ }
+
+ private Thread createQueryThread(final int cmd, final UUID sessionId)
+ {
+ return new Thread(new WrappedRunnable()
+ {
+ // Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces.
+ // Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts.
+ public void runMayThrow() throws Exception
+ {
+ TraceState state = Tracing.instance.get(sessionId);
+ if (state == null)
+ throw new Exception("no tracestate");
+
+ String format = "select event_id, source, activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;";
+ String query = String.format(format, TraceKeyspace.NAME, TraceKeyspace.EVENTS);
+ SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement;
+
+ ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId);
+ InetAddress source = FBUtilities.getBroadcastAddress();
+
+ HashSet<UUID>[] seen = new HashSet[] { new HashSet<>(), new HashSet<>() };
+ int si = 0;
+ UUID uuid;
+
+ long tlast = System.currentTimeMillis(), tcur;
+
+ TraceState.Status status;
+ long minWaitMillis = 125;
+ long maxWaitMillis = 1000 * 1024L;
+ long timeout = minWaitMillis;
+ boolean shouldDouble = false;
+
+ while ((status = state.waitActivity(timeout)) != TraceState.Status.STOPPED)
+ {
+ if (status == TraceState.Status.IDLE)
+ {
+ timeout = shouldDouble ? Math.min(timeout * 2, maxWaitMillis) : timeout;
+ shouldDouble = !shouldDouble;
+ }
+ else
+ {
+ timeout = minWaitMillis;
+ shouldDouble = false;
+ }
+ ByteBuffer tminBytes = ByteBufferUtil.bytes(UUIDGen.minTimeUUID(tlast - 1000));
+ ByteBuffer tmaxBytes = ByteBufferUtil.bytes(UUIDGen.maxTimeUUID(tcur = System.currentTimeMillis()));
+ QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(sessionIdBytes,
+ tminBytes,
+ tmaxBytes));
+ ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options);
+ UntypedResultSet result = UntypedResultSet.create(rows.result);
+
+ for (UntypedResultSet.Row r : result)
+ {
+ if (source.equals(r.getInetAddress("source")))
+ continue;
+ if ((uuid = r.getUUID("event_id")).timestamp() > (tcur - 1000) * 10000)
+ seen[si].add(uuid);
+ if (seen[si == 0 ? 1 : 0].contains(uuid))
+ continue;
+ String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity"));
+ fireProgressEvent("repair:" + cmd,
+ new ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message));
+ }
+ tlast = tcur;
+
+ si = si == 0 ? 1 : 0;
+ seen[si].clear();
+ }
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2566d1c/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2566d1c/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index 7793660,26e5126..f34d0e2
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@@ -50,180 -43,11 +50,182 @@@ import org.apache.cassandra.utils.concu
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-public class ActiveRepairServiceTest extends SchemaLoader
+public class ActiveRepairServiceTest
{
+ public static final String KEYSPACE5 = "Keyspace5";
+ public static final String CF_STANDARD1 = "Standard1";
+ public static final String CF_COUNTER = "Counter1";
- private static final String KEYSPACE1 = "Keyspace1";
- private static final String CF = "Standard1";
+ public String cfname;
+ public ColumnFamilyStore store;
+ public InetAddress LOCAL, REMOTE;
+
+ private boolean initialized;
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE5,
+ SimpleStrategy.class,
+ KSMetaData.optsWithRF(2),
+ SchemaLoader.standardCFMD(KEYSPACE5, CF_COUNTER),
+ SchemaLoader.standardCFMD(KEYSPACE5, CF_STANDARD1));
+ }
+
+ @Before
+ public void prepare() throws Exception
+ {
+ if (!initialized)
+ {
+ SchemaLoader.startGossiper();
+ initialized = true;
+
+ LOCAL = FBUtilities.getBroadcastAddress();
+ // generate a fake endpoint for which we can spoof receiving/sending trees
+ REMOTE = InetAddress.getByName("127.0.0.2");
+ }
+
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+ tmd.clearUnsafe();
+ StorageService.instance.setTokens(Collections.singleton(StorageService.getPartitioner().getRandomToken()));
+ tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(), REMOTE);
+ assert tmd.isMember(REMOTE);
+ }
+
+ @Test
+ public void testGetNeighborsPlusOne() throws Throwable
+ {
+ // generate rf+1 nodes, and ensure that all nodes are returned
+ Set<InetAddress> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ expected.remove(FBUtilities.getBroadcastAddress());
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+ Set<InetAddress> neighbors = new HashSet<>();
+ for (Range<Token> range : ranges)
+ {
- neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, null, null));
++ neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, null, null));
+ }
+ assertEquals(expected, neighbors);
+ }
+
+ @Test
+ public void testGetNeighborsTimesTwo() throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+ // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned
+ addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy();
+ Set<InetAddress> expected = new HashSet<>();
+ for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
+ {
+ expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+ }
+ expected.remove(FBUtilities.getBroadcastAddress());
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+ Set<InetAddress> neighbors = new HashSet<>();
+ for (Range<Token> range : ranges)
+ {
- neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, null, null));
++ neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, null, null));
+ }
+ assertEquals(expected, neighbors);
+ }
+
+ @Test
+ public void testGetNeighborsPlusOneInLocalDC() throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+ // generate rf+1 nodes, and ensure that all nodes are returned
+ Set<InetAddress> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ expected.remove(FBUtilities.getBroadcastAddress());
+ // remove remote endpoints
+ TokenMetadata.Topology topology = tmd.cloneOnlyTokenMap().getTopology();
+ HashSet<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
+ expected = Sets.intersection(expected, localEndpoints);
+
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+ Set<InetAddress> neighbors = new HashSet<>();
+ for (Range<Token> range : ranges)
+ {
- neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
++ neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+ }
+ assertEquals(expected, neighbors);
+ }
+
+ @Test
+ public void testGetNeighborsTimesTwoInLocalDC() throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+ // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned
+ addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy();
+ Set<InetAddress> expected = new HashSet<>();
+ for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
+ {
+ expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+ }
+ expected.remove(FBUtilities.getBroadcastAddress());
+ // remove remote endpoints
+ TokenMetadata.Topology topology = tmd.cloneOnlyTokenMap().getTopology();
+ HashSet<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
+ expected = Sets.intersection(expected, localEndpoints);
+
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+ Set<InetAddress> neighbors = new HashSet<>();
+ for (Range<Token> range : ranges)
+ {
- neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
++ neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+ }
+ assertEquals(expected, neighbors);
+ }
+
+ @Test
+ public void testGetNeighborsTimesTwoInSpecifiedHosts() throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+ // generate rf*2 nodes, and ensure that only neighbors specified by the hosts are returned
+ addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy();
+ List<InetAddress> expected = new ArrayList<>();
+ for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
+ {
+ expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+ }
+
+ expected.remove(FBUtilities.getBroadcastAddress());
+ Collection<String> hosts = Arrays.asList(FBUtilities.getBroadcastAddress().getCanonicalHostName(),expected.get(0).getCanonicalHostName());
++ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+
- assertEquals(expected.get(0), ActiveRepairService.getNeighbors(KEYSPACE5,
- StorageService.instance.getLocalRanges(KEYSPACE5).iterator().next(),
- null, hosts).iterator().next());
++ assertEquals(expected.get(0), ActiveRepairService.getNeighbors(KEYSPACE5, ranges,
++ ranges.iterator().next(),
++ null, hosts).iterator().next());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetNeighborsSpecifiedHostsWithNoLocalHost() throws Throwable
+ {
+ addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ //Dont give local endpoint
+ Collection<String> hosts = Arrays.asList("127.0.0.3");
- ActiveRepairService.getNeighbors(KEYSPACE5, StorageService.instance.getLocalRanges(KEYSPACE5).iterator().next(), null, hosts);
++ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
++ ActiveRepairService.getNeighbors(KEYSPACE5, ranges, ranges.iterator().next(), null, hosts);
+ }
+
+ Set<InetAddress> addTokens(int max) throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+ Set<InetAddress> endpoints = new HashSet<>();
+ for (int i = 1; i <= max; i++)
+ {
+ InetAddress endpoint = InetAddress.getByName("127.0.0." + i);
+ tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(), endpoint);
+ endpoints.add(endpoint);
+ }
+ return endpoints;
+ }
@Test
public void testGetActiveRepairedSSTableRefs()
[09/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by sn...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/70ee4ed4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/70ee4ed4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/70ee4ed4
Branch: refs/heads/trunk
Commit: 70ee4ed49a01a6466f8156c69f383ebc1d3b0e29
Parents: 3c531cb c2566d1
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Jun 15 11:46:25 2016 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Jun 15 11:46:25 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/repair/RepairRunnable.java | 7 ++++++-
.../cassandra/service/ActiveRepairService.java | 7 +++++--
.../service/ActiveRepairServiceTest.java | 18 ++++++++++--------
4 files changed, 22 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70ee4ed4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2c42c94,d9afaa3..3eb420d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,30 -1,5 +1,31 @@@
-2.2.7
+3.0.8
+ * Add TimeWindowCompactionStrategy (CASSANDRA-9666)
+Merged from 2.2:
* StorageService shutdown hook should use a volatile variable (CASSANDRA-11984)
+Merged from 2.1:
++ * Cache local ranges when calculating repair neighbors (CASSANDRA-11934)
+ * Allow LWT operation on static column with only partition keys (CASSANDRA-10532)
+ * Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886)
+ * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749)
+
+
+3.0.7
+ * Fix legacy serialization of Thrift-generated non-compound range tombstones
+ when communicating with 2.x nodes (CASSANDRA-11930)
+ * Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849)
+ * Avoid referencing DatabaseDescriptor in AbstractType (CASSANDRA-11912)
+ * Fix sstables not being protected from removal during index build (CASSANDRA-11905)
+ * cqlsh: Suppress stack trace from Read/WriteFailures (CASSANDRA-11032)
+ * Remove unneeded code to repair index summaries that have
+ been improperly down-sampled (CASSANDRA-11127)
+ * Avoid WriteTimeoutExceptions during commit log replay due to materialized
+ view lock contention (CASSANDRA-11891)
+ * Prevent OOM failures on SSTable corruption, improve tests for corruption detection (CASSANDRA-9530)
+ * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705)
+ * Allow compaction strategies to disable early open (CASSANDRA-11754)
+ * Refactor Materialized View code (CASSANDRA-11475)
+ * Update Java Driver (CASSANDRA-11615)
+Merged from 2.2:
* Persist local metadata earlier in startup sequence (CASSANDRA-11742)
* Run CommitLog tests with different compression settings (CASSANDRA-9039)
* cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70ee4ed4/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java
index 741cada,f92310b..21d0cd6
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@@ -147,19 -146,22 +147,24 @@@ public class RepairRunnable extends Wra
}
final Set<InetAddress> allNeighbors = new HashSet<>();
- Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
+ List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges = new ArrayList<>();
+
+ //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent
+ //calculation multiple times
+ Collection<Range<Token>> keyspaceLocalRanges = storageService.getLocalRanges(keyspace);
+
try
{
for (Range<Token> range : options.getRanges())
{
- Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range,
- Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges,
- range, options.getDataCenters(),
- options.getHosts());
- rangeToNeighbors.put(range, neighbors);
- allNeighbors.addAll(neighbors);
++ Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range,
+ options.getDataCenters(),
+ options.getHosts());
+
+ addRangeToNeighbors(commonRanges, range, neighbors);
+ allNeighbors.addAll(neighbors);
}
+
progress.incrementAndGet();
}
catch (IllegalArgumentException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70ee4ed4/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 9f249e4,0bb7172..6b1fd83
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -182,7 -183,7 +183,9 @@@ public class ActiveRepairService implem
*
* @return neighbors with whom we share the provided range
*/
- public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts)
- public static Set<InetAddress> getNeighbors(String keyspaceName, Collection<Range<Token>> keyspaceLocalRanges, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts)
++ public static Set<InetAddress> getNeighbors(String keyspaceName, Collection<Range<Token>> keyspaceLocalRanges,
++ Range<Token> toRepair, Collection<String> dataCenters,
++ Collection<String> hosts)
{
StorageService ss = StorageService.instance;
Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(keyspaceName);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70ee4ed4/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
[03/10] cassandra git commit: Cache local ranges when calculating
repair neighbors
Posted by sn...@apache.org.
Cache local ranges when calculating repair neighbors
patch by Mahdi Mohammadi; reviewed by Paulo Motta for CASSANDRA-11933
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f74831f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f74831f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f74831f
Branch: refs/heads/cassandra-3.0
Commit: 2f74831f4218142f6e118678a3c74c79c1f7e1ed
Parents: 1d2d074
Author: Mahdi Mohammadi <ma...@gmail.com>
Authored: Wed Jun 15 11:43:27 2016 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Jun 15 11:43:27 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/service/ActiveRepairService.java | 8 +++++---
.../org/apache/cassandra/service/StorageService.java | 6 +++++-
.../service/AntiEntropyServiceTestAbstract.java | 14 ++++++++------
4 files changed, 19 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7d70902..ec2b48e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.15
+ * Cache local ranges when calculating repair neighbors (CASSANDRA-11933)
* Allow LWT operation on static column with only partition keys (CASSANDRA-10532)
* Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886)
* cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index f8975f9..4c83c48 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -164,7 +164,8 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
RepairFuture submitArtificialRepairSession(RepairJobDesc desc)
{
Set<InetAddress> neighbours = new HashSet<>();
- neighbours.addAll(ActiveRepairService.getNeighbors(desc.keyspace, desc.range, null, null));
+ Collection<Range<Token>> keyspaceLocalRanges = StorageService.instance.getLocalRanges(desc.keyspace);
+ neighbours.addAll(ActiveRepairService.getNeighbors(desc.keyspace, keyspaceLocalRanges, desc.range, null, null));
RepairSession session = new RepairSession(desc.parentSessionId, desc.sessionId, desc.range, desc.keyspace, RepairParallelism.PARALLEL, neighbours, new String[]{desc.columnFamily});
sessions.put(session.getId(), session);
RepairFuture futureTask = new RepairFuture(session);
@@ -176,17 +177,18 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
* Return all of the neighbors with whom we share the provided range.
*
* @param keyspaceName keyspace to repair
+ * @param keyspaceLocalRanges local-range for given keyspaceName
* @param toRepair token to repair
* @param dataCenters the data centers to involve in the repair
*
* @return neighbors with whom we share the provided range
*/
- public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts)
+ public static Set<InetAddress> getNeighbors(String keyspaceName, Collection<Range<Token>> keyspaceLocalRanges, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts)
{
StorageService ss = StorageService.instance;
Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(keyspaceName);
Range<Token> rangeSuperSet = null;
- for (Range<Token> range : ss.getLocalRanges(keyspaceName))
+ for (Range<Token> range : keyspaceLocalRanges)
{
if (range.contains(toRepair))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index eea4556..27939f9 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2978,13 +2978,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return;
}
+ //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent
+ //calculation multiple times
+ Collection<Range<Token>> keyspaceLocalRanges = getLocalRanges(keyspace);
+
Set<InetAddress> allNeighbors = new HashSet<>();
Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
for (Range<Token> range : ranges)
{
try
{
- Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range, dataCenters, hosts);
+ Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range, dataCenters, hosts);
rangeToNeighbors.put(range, neighbors);
allNeighbors.addAll(neighbors);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index ac39de6..21eb492 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -123,7 +123,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null, null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, null, null));
}
assertEquals(expected, neighbors);
}
@@ -146,7 +146,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null, null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, null, null));
}
assertEquals(expected, neighbors);
}
@@ -168,7 +168,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
}
assertEquals(expected, neighbors);
}
@@ -196,7 +196,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
}
assertEquals(expected, neighbors);
}
@@ -218,7 +218,8 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
expected.remove(FBUtilities.getBroadcastAddress());
Collection<String> hosts = Arrays.asList(FBUtilities.getBroadcastAddress().getCanonicalHostName(),expected.get(0).getCanonicalHostName());
- assertEquals(expected.get(0), ActiveRepairService.getNeighbors(keyspaceName, StorageService.instance.getLocalRanges(keyspaceName).iterator().next(), null, hosts).iterator().next());
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspaceName);
+ assertEquals(expected.get(0), ActiveRepairService.getNeighbors(keyspaceName, ranges, ranges.iterator().next(), null, hosts).iterator().next());
}
@Test(expected = IllegalArgumentException.class)
@@ -227,7 +228,8 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
addTokens(2 * Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor());
//Dont give local endpoint
Collection<String> hosts = Arrays.asList("127.0.0.3");
- ActiveRepairService.getNeighbors(keyspaceName, StorageService.instance.getLocalRanges(keyspaceName).iterator().next(), null, hosts);
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspaceName);
+ ActiveRepairService.getNeighbors(keyspaceName, ranges, ranges.iterator().next(), null, hosts);
}
Set<InetAddress> addTokens(int max) throws Throwable