You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:36:04 UTC
[12/18] cassandra git commit: Transient Replication and Cheap Quorums
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/SimpleStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
index 545ad28..7a000b7 100644
--- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java
+++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
@@ -21,9 +21,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Collection;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.dht.Token;
@@ -36,34 +36,41 @@ import org.apache.cassandra.dht.Token;
*/
public class SimpleStrategy extends AbstractReplicationStrategy
{
+ private final ReplicationFactor rf;
+
public SimpleStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
{
super(keyspaceName, tokenMetadata, snitch, configOptions);
+ this.rf = ReplicationFactor.fromString(this.configOptions.get("replication_factor"));
}
- public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+ public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata)
{
- int replicas = getReplicationFactor();
- ArrayList<Token> tokens = metadata.sortedTokens();
- List<InetAddressAndPort> endpoints = new ArrayList<InetAddressAndPort>(replicas);
+ ArrayList<Token> ring = metadata.sortedTokens();
+ if (ring.isEmpty())
+ return EndpointsForRange.empty(new Range<>(metadata.partitioner.getMinimumToken(), metadata.partitioner.getMinimumToken()));
+
+ Token replicaEnd = TokenMetadata.firstToken(ring, token);
+ Token replicaStart = metadata.getPredecessor(replicaEnd);
+ Range<Token> replicaRange = new Range<>(replicaStart, replicaEnd);
+ Iterator<Token> iter = TokenMetadata.ringIterator(ring, token, false);
- if (tokens.isEmpty())
- return endpoints;
+ EndpointsForRange.Builder replicas = EndpointsForRange.builder(replicaRange, rf.allReplicas);
// Add the token at the index by default
- Iterator<Token> iter = TokenMetadata.ringIterator(tokens, token, false);
- while (endpoints.size() < replicas && iter.hasNext())
+ while (replicas.size() < rf.allReplicas && iter.hasNext())
{
- InetAddressAndPort ep = metadata.getEndpoint(iter.next());
- if (!endpoints.contains(ep))
- endpoints.add(ep);
+ Token tk = iter.next();
+ InetAddressAndPort ep = metadata.getEndpoint(tk);
+ if (!replicas.containsEndpoint(ep))
+ replicas.add(new Replica(ep, replicaRange, replicas.size() < rf.fullReplicas));
}
- return endpoints;
+ return replicas.build();
}
- public int getReplicationFactor()
+ public ReplicationFactor getReplicationFactor()
{
- return Integer.parseInt(this.configOptions.get("replication_factor"));
+ return rf;
}
public void validateOptions() throws ConfigurationException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/SystemReplicas.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SystemReplicas.java b/src/java/org/apache/cassandra/locator/SystemReplicas.java
new file mode 100644
index 0000000..13a9d74
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/SystemReplicas.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+public class SystemReplicas
+{
+ private static final Map<InetAddressAndPort, Replica> systemReplicas = new ConcurrentHashMap<>();
+ public static final Range<Token> FULL_RANGE = new Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(),
+ DatabaseDescriptor.getPartitioner().getMinimumToken());
+
+ private static Replica createSystemReplica(InetAddressAndPort endpoint)
+ {
+ return new Replica(endpoint, FULL_RANGE, true);
+ }
+
+ /**
+ * There are a few places where a system function borrows write path functionality, but doesn't otherwise
+ * fit into normal replication strategies (ie: hints and batchlog). So here we provide a replica instance
+ * @param endpoint
+ * @return
+ */
+ public static Replica getSystemReplica(InetAddressAndPort endpoint)
+ {
+ return systemReplicas.computeIfAbsent(endpoint, SystemReplicas::createSystemReplica);
+ }
+
+ public static Collection<Replica> getSystemReplicas(Collection<InetAddressAndPort> endpoints)
+ {
+ List<Replica> replicas = new ArrayList<>(endpoints.size());
+ for (InetAddressAndPort endpoint: endpoints)
+ {
+ replicas.add(getSystemReplica(endpoint));
+ }
+ return replicas;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 46c191f..4ab34db 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.*;
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,6 +88,7 @@ public class TokenMetadata
// (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving)
private final Set<InetAddressAndPort> leavingEndpoints = new HashSet<>();
// this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints}
+ // NOTE: this may contain ranges that conflict with the those implied by sortedTokens when a range is changing its transient status
private final ConcurrentMap<String, PendingRangeMaps> pendingRanges = new ConcurrentHashMap<String, PendingRangeMaps>();
// nodes which are migrating to the new tokens in the ring
@@ -733,24 +735,20 @@ public class TokenMetadata
return sortedTokens;
}
- public Multimap<Range<Token>, InetAddressAndPort> getPendingRangesMM(String keyspaceName)
+ public EndpointsByRange getPendingRangesMM(String keyspaceName)
{
- Multimap<Range<Token>, InetAddressAndPort> map = HashMultimap.create();
+ EndpointsByRange.Mutable byRange = new EndpointsByRange.Mutable();
PendingRangeMaps pendingRangeMaps = this.pendingRanges.get(keyspaceName);
if (pendingRangeMaps != null)
{
- for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : pendingRangeMaps)
+ for (Map.Entry<Range<Token>, EndpointsForRange.Mutable> entry : pendingRangeMaps)
{
- Range<Token> range = entry.getKey();
- for (InetAddressAndPort address : entry.getValue())
- {
- map.put(range, address);
- }
+ byRange.putAll(entry.getKey(), entry.getValue(), Conflict.ALL);
}
}
- return map;
+ return byRange.asImmutableView();
}
/** a mutable map may be returned but caller should not modify it */
@@ -759,17 +757,18 @@ public class TokenMetadata
return this.pendingRanges.get(keyspaceName);
}
- public List<Range<Token>> getPendingRanges(String keyspaceName, InetAddressAndPort endpoint)
+ public RangesAtEndpoint getPendingRanges(String keyspaceName, InetAddressAndPort endpoint)
{
- List<Range<Token>> ranges = new ArrayList<>();
- for (Map.Entry<Range<Token>, InetAddressAndPort> entry : getPendingRangesMM(keyspaceName).entries())
+ RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(endpoint);
+ for (Map.Entry<Range<Token>, Replica> entry : getPendingRangesMM(keyspaceName).flattenEntries())
{
- if (entry.getValue().equals(endpoint))
+ Replica replica = entry.getValue();
+ if (replica.endpoint().equals(endpoint))
{
- ranges.add(entry.getKey());
+ builder.add(replica);
}
}
- return ranges;
+ return builder.build();
}
/**
@@ -858,25 +857,27 @@ public class TokenMetadata
{
PendingRangeMaps newPendingRanges = new PendingRangeMaps();
- Multimap<InetAddressAndPort, Range<Token>> addressRanges = strategy.getAddressRanges(metadata);
+ RangesByEndpoint addressRanges = strategy.getAddressReplicas(metadata);
// Copy of metadata reflecting the situation after all leave operations are finished.
TokenMetadata allLeftMetadata = removeEndpoints(metadata.cloneOnlyTokenMap(), leavingEndpoints);
// get all ranges that will be affected by leaving nodes
- Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
+ Set<Range<Token>> removeAffectedRanges = new HashSet<>();
for (InetAddressAndPort endpoint : leavingEndpoints)
- affectedRanges.addAll(addressRanges.get(endpoint));
+ removeAffectedRanges.addAll(addressRanges.get(endpoint).ranges());
// for each of those ranges, find what new nodes will be responsible for the range when
// all leaving nodes are gone.
- for (Range<Token> range : affectedRanges)
+ for (Range<Token> range : removeAffectedRanges)
{
- Set<InetAddressAndPort> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
- Set<InetAddressAndPort> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
- for (InetAddressAndPort address : Sets.difference(newEndpoints, currentEndpoints))
+ EndpointsForRange currentReplicas = strategy.calculateNaturalReplicas(range.right, metadata);
+ EndpointsForRange newReplicas = strategy.calculateNaturalReplicas(range.right, allLeftMetadata);
+ for (Replica replica : newReplicas)
{
- newPendingRanges.addPendingRange(range, address);
+ if (currentReplicas.endpoints().contains(replica.endpoint()))
+ continue;
+ newPendingRanges.addPendingRange(range, replica);
}
}
@@ -891,9 +892,9 @@ public class TokenMetadata
Collection<Token> tokens = bootstrapAddresses.get(endpoint);
allLeftMetadata.updateNormalTokens(tokens, endpoint);
- for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+ for (Replica replica : strategy.getAddressReplicas(allLeftMetadata, endpoint))
{
- newPendingRanges.addPendingRange(range, endpoint);
+ newPendingRanges.addPendingRange(replica.range(), replica);
}
allLeftMetadata.removeEndpoint(endpoint);
}
@@ -906,38 +907,43 @@ public class TokenMetadata
for (Pair<Token, InetAddressAndPort> moving : movingEndpoints)
{
//Calculate all the ranges which will could be affected. This will include the ranges before and after the move.
- Set<Range<Token>> moveAffectedRanges = new HashSet<>();
+ Set<Replica> moveAffectedReplicas = new HashSet<>();
InetAddressAndPort endpoint = moving.right; // address of the moving node
//Add ranges before the move
- for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+ for (Replica replica : strategy.getAddressReplicas(allLeftMetadata, endpoint))
{
- moveAffectedRanges.add(range);
+ moveAffectedReplicas.add(replica);
}
allLeftMetadata.updateNormalToken(moving.left, endpoint);
//Add ranges after the move
- for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+ for (Replica replica : strategy.getAddressReplicas(allLeftMetadata, endpoint))
{
- moveAffectedRanges.add(range);
+ moveAffectedReplicas.add(replica);
}
- for(Range<Token> range : moveAffectedRanges)
+ for (Replica replica : moveAffectedReplicas)
{
- Set<InetAddressAndPort> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
- Set<InetAddressAndPort> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+ Set<InetAddressAndPort> currentEndpoints = strategy.calculateNaturalReplicas(replica.range().right, metadata).endpoints();
+ Set<InetAddressAndPort> newEndpoints = strategy.calculateNaturalReplicas(replica.range().right, allLeftMetadata).endpoints();
Set<InetAddressAndPort> difference = Sets.difference(newEndpoints, currentEndpoints);
- for(final InetAddressAndPort address : difference)
+ for (final InetAddressAndPort address : difference)
{
- Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address);
- Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address);
- //We want to get rid of any ranges which the node is currently getting.
- newRanges.removeAll(oldRanges);
+ RangesAtEndpoint newReplicas = strategy.getAddressReplicas(allLeftMetadata, address);
+ RangesAtEndpoint oldReplicas = strategy.getAddressReplicas(metadata, address);
- for(Range<Token> newRange : newRanges)
+ // Filter out the things that are already replicated
+ newReplicas = newReplicas.filter(r -> !oldReplicas.contains(r));
+ for (Replica newReplica : newReplicas)
{
- for(Range<Token> pendingRange : newRange.subtractAll(oldRanges))
+ // for correctness on write, we need to treat ranges that are becoming full differently
+ // to those that are presently transient; however reads must continue to use the current view
+ // for ranges that are becoming transient. We could choose to ignore them here, but it's probably
+ // cleaner to ensure this is dealt with at point of use, where we can make a conscious decision
+ // about which to use
+ for (Replica pendingReplica : newReplica.subtractSameReplication(oldReplicas))
{
- newPendingRanges.addPendingRange(pendingRange, address);
+ newPendingRanges.addPendingRange(pendingReplica.range(), pendingReplica);
}
}
}
@@ -1206,11 +1212,11 @@ public class TokenMetadata
return sb.toString();
}
- public Collection<InetAddressAndPort> pendingEndpointsFor(Token token, String keyspaceName)
+ public EndpointsForToken pendingEndpointsForToken(Token token, String keyspaceName)
{
PendingRangeMaps pendingRangeMaps = this.pendingRanges.get(keyspaceName);
if (pendingRangeMaps == null)
- return Collections.emptyList();
+ return EndpointsForToken.empty(token);
return pendingRangeMaps.pendingEndpointsFor(token);
}
@@ -1218,9 +1224,15 @@ public class TokenMetadata
/**
* @deprecated retained for benefit of old tests
*/
- public Collection<InetAddressAndPort> getWriteEndpoints(Token token, String keyspaceName, Collection<InetAddressAndPort> naturalEndpoints)
+ public EndpointsForToken getWriteEndpoints(Token token, String keyspaceName, EndpointsForToken natural)
{
- return ImmutableList.copyOf(Iterables.concat(naturalEndpoints, pendingEndpointsFor(token, keyspaceName)));
+ EndpointsForToken pending = pendingEndpointsForToken(token, keyspaceName);
+ if (Endpoints.haveConflicts(natural, pending))
+ {
+ natural = Endpoints.resolveConflictsInNatural(natural, pending);
+ pending = Endpoints.resolveConflictsInPending(natural, pending);
+ }
+ return Endpoints.concat(natural, pending);
}
/** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 9e8d542..5a90804 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -102,6 +102,8 @@ public class KeyspaceMetrics
public final Counter speculativeFailedRetries;
/** Needed to speculate, but didn't have enough replicas **/
public final Counter speculativeInsufficientReplicas;
+ /** Needed to write to a transient replica to satisfy quorum **/
+ public final Counter speculativeWrites;
/** Number of started repairs as coordinator on this keyspace */
public final Counter repairsStarted;
/** Number of completed repairs as coordinator on this keyspace */
@@ -268,41 +270,12 @@ public class KeyspaceMetrics
writeFailedIdealCL = Metrics.counter(factory.createMetricName("WriteFailedIdealCL"));
idealCLWriteLatency = new LatencyMetrics(factory, "IdealCLWrite");
- speculativeRetries = createKeyspaceCounter("SpeculativeRetries", new MetricValue()
- {
- public Long getValue(TableMetrics metric)
- {
- return metric.speculativeRetries.getCount();
- }
- });
- speculativeFailedRetries = createKeyspaceCounter("SpeculativeFailedRetries", new MetricValue()
- {
- public Long getValue(TableMetrics metric)
- {
- return metric.speculativeFailedRetries.getCount();
- }
- });
- speculativeInsufficientReplicas = createKeyspaceCounter("SpeculativeInsufficientReplicas", new MetricValue()
- {
- public Long getValue(TableMetrics metric)
- {
- return metric.speculativeInsufficientReplicas.getCount();
- }
- });
- repairsStarted = createKeyspaceCounter("RepairJobsStarted", new MetricValue()
- {
- public Long getValue(TableMetrics metric)
- {
- return metric.repairsStarted.getCount();
- }
- });
- repairsCompleted = createKeyspaceCounter("RepairJobsCompleted", new MetricValue()
- {
- public Long getValue(TableMetrics metric)
- {
- return metric.repairsCompleted.getCount();
- }
- });
+ speculativeRetries = createKeyspaceCounter("SpeculativeRetries", metric -> metric.speculativeRetries.getCount());
+ speculativeFailedRetries = createKeyspaceCounter("SpeculativeFailedRetries", metric -> metric.speculativeFailedRetries.getCount());
+ speculativeInsufficientReplicas = createKeyspaceCounter("SpeculativeInsufficientReplicas", metric -> metric.speculativeInsufficientReplicas.getCount());
+ speculativeWrites = createKeyspaceCounter("SpeculativeWrites", metric -> metric.speculativeWrites.getCount());
+ repairsStarted = createKeyspaceCounter("RepairJobsStarted", metric -> metric.repairsStarted.getCount());
+ repairsCompleted = createKeyspaceCounter("RepairJobsCompleted", metric -> metric.repairsCompleted.getCount());
repairTime = Metrics.timer(factory.createMetricName("RepairTime"));
repairPrepareTime = Metrics.timer(factory.createMetricName("RepairPrepareTime"));
anticompactionTime = Metrics.timer(factory.createMetricName("AntiCompactionTime"));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java b/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java
index fe7673d..3d00b12 100644
--- a/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java
@@ -37,6 +37,7 @@ public class ReadRepairMetrics
@Deprecated
public static final Meter attempted = Metrics.meter(factory.createMetricName("Attempted"));
+ // Incremented when additional requests were sent during blocking read repair due to unavailable or slow nodes
public static final Meter speculatedRead = Metrics.meter(factory.createMetricName("SpeculatedRead"));
public static final Meter speculatedWrite = Metrics.meter(factory.createMetricName("SpeculatedWrite"));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 49603ba..53ebcb0 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -214,6 +214,9 @@ public class TableMetrics
public final Counter speculativeInsufficientReplicas;
public final Gauge<Long> speculativeSampleLatencyNanos;
+ public final Counter speculativeWrites;
+ public final Gauge<Long> speculativeWriteLatencyNanos;
+
public final static LatencyMetrics globalReadLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Read");
public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
@@ -239,7 +242,7 @@ public class TableMetrics
Keyspace k = Schema.instance.getKeyspaceInstance(keyspace);
if (SchemaConstants.DISTRIBUTED_KEYSPACE_NAME.equals(k.getName()))
continue;
- if (k.getReplicationStrategy().getReplicationFactor() < 2)
+ if (k.getReplicationStrategy().getReplicationFactor().allReplicas < 2)
continue;
for (ColumnFamilyStore cf : k.getColumnFamilyStores())
@@ -825,13 +828,11 @@ public class TableMetrics
speculativeRetries = createTableCounter("SpeculativeRetries");
speculativeFailedRetries = createTableCounter("SpeculativeFailedRetries");
speculativeInsufficientReplicas = createTableCounter("SpeculativeInsufficientReplicas");
- speculativeSampleLatencyNanos = createTableGauge("SpeculativeSampleLatencyNanos", new Gauge<Long>()
- {
- public Long getValue()
- {
- return cfs.sampleLatencyNanos;
- }
- });
+ speculativeSampleLatencyNanos = createTableGauge("SpeculativeSampleLatencyNanos", () -> cfs.sampleReadLatencyNanos);
+
+ speculativeWrites = createTableCounter("SpeculativeWrites");
+ speculativeWriteLatencyNanos = createTableGauge("SpeculativeWriteLatencyNanos", () -> cfs.transientWriteLatencyNanos);
+
keyCacheHitRate = Metrics.register(factory.createMetricName("KeyCacheHitRate"),
aliasFactory.createMetricName("KeyCacheHitRate"),
new RatioGauge()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/net/IAsyncCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallback.java b/src/java/org/apache/cassandra/net/IAsyncCallback.java
index 251d263..253b412 100644
--- a/src/java/org/apache/cassandra/net/IAsyncCallback.java
+++ b/src/java/org/apache/cassandra/net/IAsyncCallback.java
@@ -21,6 +21,7 @@ import com.google.common.base.Predicate;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
/**
* implementors of IAsyncCallback need to make sure that any public methods
@@ -30,13 +31,9 @@ import org.apache.cassandra.locator.InetAddressAndPort;
*/
public interface IAsyncCallback<T>
{
- Predicate<InetAddressAndPort> isAlive = new Predicate<InetAddressAndPort>()
- {
- public boolean apply(InetAddressAndPort endpoint)
- {
- return FailureDetector.instance.isAlive(endpoint);
- }
- };
+ final Predicate<InetAddressAndPort> isAlive = FailureDetector.instance::isAlive;
+
+ final Predicate<Replica> isReplicaAlive = replica -> isAlive.apply(replica.endpoint());
/**
* @param msg response received.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index c8fe3b7..bd290a1 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -90,6 +90,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.ILatencySubscriber;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.metrics.CassandraMetricsRegistry;
import org.apache.cassandra.metrics.ConnectionMetrics;
import org.apache.cassandra.metrics.DroppedMessageMetrics;
@@ -604,8 +605,9 @@ public final class MessagingService implements MessagingServiceMBean
if (expiredCallbackInfo.shouldHint())
{
- Mutation mutation = ((WriteCallbackInfo) expiredCallbackInfo).mutation();
- return StorageProxy.submitHint(mutation, expiredCallbackInfo.target, null);
+ WriteCallbackInfo writeCallbackInfo = ((WriteCallbackInfo) expiredCallbackInfo);
+ Mutation mutation = writeCallbackInfo.mutation();
+ return StorageProxy.submitHint(mutation, writeCallbackInfo.getReplica(), null);
}
return null;
@@ -961,7 +963,7 @@ public final class MessagingService implements MessagingServiceMBean
return verbHandlers.get(type);
}
- public int addCallback(IAsyncCallback cb, MessageOut message, InetAddressAndPort to, long timeout, boolean failureCallback)
+ public int addWriteCallback(IAsyncCallback cb, MessageOut message, InetAddressAndPort to, long timeout, boolean failureCallback)
{
assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
int messageId = nextId();
@@ -970,12 +972,12 @@ public final class MessagingService implements MessagingServiceMBean
return messageId;
}
- public int addCallback(IAsyncCallback cb,
- MessageOut<?> message,
- InetAddressAndPort to,
- long timeout,
- ConsistencyLevel consistencyLevel,
- boolean allowHints)
+ public int addWriteCallback(IAsyncCallback cb,
+ MessageOut<?> message,
+ Replica to,
+ long timeout,
+ ConsistencyLevel consistencyLevel,
+ boolean allowHints)
{
assert message.verb == Verb.MUTATION
|| message.verb == Verb.COUNTER_MUTATION
@@ -1024,7 +1026,7 @@ public final class MessagingService implements MessagingServiceMBean
*/
public int sendRR(MessageOut message, InetAddressAndPort to, IAsyncCallback cb, long timeout, boolean failureCallback)
{
- int id = addCallback(cb, message, to, timeout, failureCallback);
+ int id = addWriteCallback(cb, message, to, timeout, failureCallback);
updateBackPressureOnSend(to, cb, message);
sendOneWay(failureCallback ? message.withParameter(ParameterType.FAILURE_CALLBACK, ONE_BYTE) : message, id, to);
return id;
@@ -1042,14 +1044,14 @@ public final class MessagingService implements MessagingServiceMBean
* suggest that a timeout occurred to the invoker of the send().
* @return an reference to message id used to match with the result
*/
- public int sendRR(MessageOut<?> message,
- InetAddressAndPort to,
- AbstractWriteResponseHandler<?> handler,
- boolean allowHints)
- {
- int id = addCallback(handler, message, to, message.getTimeout(), handler.consistencyLevel, allowHints);
- updateBackPressureOnSend(to, handler, message);
- sendOneWay(message.withParameter(ParameterType.FAILURE_CALLBACK, ONE_BYTE), id, to);
+ public int sendWriteRR(MessageOut<?> message,
+ Replica to,
+ AbstractWriteResponseHandler<?> handler,
+ boolean allowHints)
+ {
+ int id = addWriteCallback(handler, message, to, message.getTimeout(), handler.consistencyLevel(), allowHints);
+ updateBackPressureOnSend(to.endpoint(), handler, message);
+ sendOneWay(message.withParameter(ParameterType.FAILURE_CALLBACK, ONE_BYTE), id, to.endpoint());
return id;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
index 41ac31b..c54e7dc 100644
--- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
@@ -21,7 +21,7 @@ package org.apache.cassandra.net;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.utils.FBUtilities;
@@ -30,24 +30,31 @@ public class WriteCallbackInfo extends CallbackInfo
{
// either a Mutation, or a Paxos Commit (MessageOut)
private final Object mutation;
+ private final Replica replica;
- public WriteCallbackInfo(InetAddressAndPort target,
+ public WriteCallbackInfo(Replica replica,
IAsyncCallback callback,
MessageOut message,
IVersionedSerializer<?> serializer,
ConsistencyLevel consistencyLevel,
boolean allowHints)
{
- super(target, callback, serializer, true);
+ super(replica.endpoint(), callback, serializer, true);
assert message != null;
this.mutation = shouldHint(allowHints, message, consistencyLevel);
//Local writes shouldn't go through messaging service (https://issues.apache.org/jira/browse/CASSANDRA-10477)
assert (!target.equals(FBUtilities.getBroadcastAddressAndPort()));
+ this.replica = replica;
}
public boolean shouldHint()
{
- return mutation != null && StorageProxy.shouldHint(target);
+ return mutation != null && StorageProxy.shouldHint(replica);
+ }
+
+ public Replica getReplica()
+ {
+ return replica;
}
public Mutation mutation()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/AbstractSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AbstractSyncTask.java b/src/java/org/apache/cassandra/repair/AbstractSyncTask.java
new file mode 100644
index 0000000..124baa1
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/AbstractSyncTask.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.util.List;
+
+import com.google.common.util.concurrent.AbstractFuture;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+public abstract class AbstractSyncTask extends AbstractFuture<SyncStat> implements Runnable
+{
+ protected abstract void startSync(List<Range<Token>> rangesToStream);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
index 2ca524f..eaf890a 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
@@ -18,12 +18,14 @@
package org.apache.cassandra.repair;
+import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamEvent;
@@ -54,8 +56,9 @@ public class AsymmetricLocalSyncTask extends AsymmetricSyncTask implements Strea
previewKind)
.listeners(this)
.flushBeforeTransfer(pendingRepair == null)
- // request ranges from the remote node
- .requestRanges(fetchFrom, desc.keyspace, rangesToFetch, desc.columnFamily);
+ // request ranges from the remote node, see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here
+ .requestRanges(fetchFrom, desc.keyspace, RangesAtEndpoint.toDummyList(rangesToFetch),
+ RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily);
plan.execute();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
index e24d854..2b171c9 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.SessionSummary;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTrees;
public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements CompletableRemoteSyncTask
{
@@ -37,6 +38,10 @@ public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements Comp
{
super(desc, fetchNode, fetchFrom, rangesToFetch, previewKind);
}
+ public AsymmetricRemoteSyncTask(RepairJobDesc desc, TreeResponse to, TreeResponse from, PreviewKind previewKind)
+ {
+ this(desc, to.endpoint, from.endpoint, MerkleTrees.difference(to.trees, from.trees), previewKind);
+ }
public void startSync(List<Range<Token>> rangesToFetch)
{
@@ -46,6 +51,7 @@ public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements Comp
Tracing.traceRepair(message);
MessagingService.instance().sendOneWay(request.createMessage(), request.fetchingNode);
}
+
public void syncComplete(boolean success, List<SessionSummary> summaries)
{
if (success)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
index 4d38e8a..35474af 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
@@ -21,8 +21,6 @@ package org.apache.cassandra.repair;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import com.google.common.util.concurrent.AbstractFuture;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,7 +31,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.tracing.Tracing;
-public abstract class AsymmetricSyncTask extends AbstractFuture<SyncStat> implements Runnable
+public abstract class AsymmetricSyncTask extends AbstractSyncTask
{
private static Logger logger = LoggerFactory.getLogger(AsymmetricSyncTask.class);
protected final RepairJobDesc desc;
@@ -44,9 +42,9 @@ public abstract class AsymmetricSyncTask extends AbstractFuture<SyncStat> implem
private long startTime = Long.MIN_VALUE;
protected volatile SyncStat stat;
-
public AsymmetricSyncTask(RepairJobDesc desc, InetAddressAndPort fetchingNode, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind)
{
+ assert !fetchFrom.equals(fetchingNode) : "Fetching from self " + fetchFrom;
this.desc = desc;
this.fetchFrom = fetchFrom;
this.fetchingNode = fetchingNode;
@@ -55,6 +53,7 @@ public abstract class AsymmetricSyncTask extends AbstractFuture<SyncStat> implem
stat = new SyncStat(new NodePair(fetchingNode, fetchFrom), rangesToFetch.size());
this.previewKind = previewKind;
}
+
public void run()
{
startTime = System.currentTimeMillis();
@@ -79,7 +78,4 @@ public abstract class AsymmetricSyncTask extends AbstractFuture<SyncStat> implem
if (startTime != Long.MIN_VALUE)
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
}
-
-
- public abstract void startSync(List<Range<Token>> rangesToFetch);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/CommonRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/CommonRange.java b/src/java/org/apache/cassandra/repair/CommonRange.java
new file mode 100644
index 0000000..928e570
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/CommonRange.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+/**
+ * Groups ranges with identical endpoints/transient endpoints
+ */
+public class CommonRange
+{
+ public final ImmutableSet<InetAddressAndPort> endpoints;
+ public final ImmutableSet<InetAddressAndPort> transEndpoints;
+ public final Collection<Range<Token>> ranges;
+
+ public CommonRange(Set<InetAddressAndPort> endpoints, Set<InetAddressAndPort> transEndpoints, Collection<Range<Token>> ranges)
+ {
+ Preconditions.checkArgument(endpoints != null && !endpoints.isEmpty());
+ Preconditions.checkArgument(transEndpoints != null);
+ Preconditions.checkArgument(endpoints.containsAll(transEndpoints), "transEndpoints must be a subset of endpoints");
+ Preconditions.checkArgument(ranges != null && !ranges.isEmpty());
+
+ this.endpoints = ImmutableSet.copyOf(endpoints);
+ this.transEndpoints = ImmutableSet.copyOf(transEndpoints);
+ this.ranges = new ArrayList(ranges);
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ CommonRange that = (CommonRange) o;
+
+ if (!endpoints.equals(that.endpoints)) return false;
+ if (!transEndpoints.equals(that.transEndpoints)) return false;
+ return ranges.equals(that.ranges);
+ }
+
+ public int hashCode()
+ {
+ int result = endpoints.hashCode();
+ result = 31 * result + transEndpoints.hashCode();
+ result = 31 * result + ranges.hashCode();
+ return result;
+ }
+
+ public String toString()
+ {
+ return "CommonRange{" +
+ "endpoints=" + endpoints +
+ ", transEndpoints=" + transEndpoints +
+ ", ranges=" + ranges +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java b/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java
index 8aa4381..bc614dc 100644
--- a/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java
+++ b/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java
@@ -25,8 +25,7 @@ import java.util.concurrent.ExecutorService;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.RangesAtEndpoint;
/**
* Keyspace level hook for repair.
@@ -38,5 +37,8 @@ public interface KeyspaceRepairManager
* been notified that the repair session has been completed, the data associated with the given session id must
* not be combined with repaired or unrepaired data, or data from other repair sessions.
*/
- ListenableFuture prepareIncrementalRepair(UUID sessionID, Collection<ColumnFamilyStore> tables, Collection<Range<Token>> ranges, ExecutorService executor);
+ ListenableFuture prepareIncrementalRepair(UUID sessionID,
+ Collection<ColumnFamilyStore> tables,
+ RangesAtEndpoint tokenRanges,
+ ExecutorService executor);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
deleted file mode 100644
index d7e0387..0000000
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair;
-
-import java.util.List;
-import java.util.UUID;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.streaming.ProgressInfo;
-import org.apache.cassandra.streaming.StreamEvent;
-import org.apache.cassandra.streaming.StreamEventHandler;
-import org.apache.cassandra.streaming.StreamPlan;
-import org.apache.cassandra.streaming.StreamState;
-import org.apache.cassandra.streaming.StreamOperation;
-import org.apache.cassandra.tracing.TraceState;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.FBUtilities;
-
-/**
- * LocalSyncTask performs streaming between local(coordinator) node and remote replica.
- */
-public class LocalSyncTask extends SyncTask implements StreamEventHandler
-{
- private final TraceState state = Tracing.instance.get();
-
- private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class);
-
- private final UUID pendingRepair;
- private final boolean pullRepair;
-
- public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, UUID pendingRepair, boolean pullRepair, PreviewKind previewKind)
- {
- super(desc, r1, r2, previewKind);
- this.pendingRepair = pendingRepair;
- this.pullRepair = pullRepair;
- }
-
- @VisibleForTesting
- StreamPlan createStreamPlan(InetAddressAndPort dst, List<Range<Token>> differences)
- {
- StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind)
- .listeners(this)
- .flushBeforeTransfer(pendingRepair == null)
- .requestRanges(dst, desc.keyspace, differences, desc.columnFamily); // request ranges from the remote node
- if (!pullRepair)
- {
- // send ranges to the remote node if we are not performing a pull repair
- plan.transferRanges(dst, desc.keyspace, differences, desc.columnFamily);
- }
-
- return plan;
- }
-
- /**
- * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback
- * that will be called out of band once the streams complete.
- */
- @Override
- protected void startSync(List<Range<Token>> differences)
- {
- InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
- // We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding
- InetAddressAndPort dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint;
-
- String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst);
- logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
- Tracing.traceRepair(message);
-
- createStreamPlan(dst, differences).execute();
- }
-
- public void handleStreamEvent(StreamEvent event)
- {
- if (state == null)
- return;
- switch (event.eventType)
- {
- case STREAM_PREPARED:
- StreamEvent.SessionPreparedEvent spe = (StreamEvent.SessionPreparedEvent) event;
- state.trace("Streaming session with {} prepared", spe.session.peer);
- break;
- case STREAM_COMPLETE:
- StreamEvent.SessionCompleteEvent sce = (StreamEvent.SessionCompleteEvent) event;
- state.trace("Streaming session with {} {}", sce.peer, sce.success ? "completed successfully" : "failed");
- break;
- case FILE_PROGRESS:
- ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress;
- state.trace("{}/{} ({}%) {} idx:{}{}",
- new Object[] { FBUtilities.prettyPrintMemory(pi.currentBytes),
- FBUtilities.prettyPrintMemory(pi.totalBytes),
- pi.currentBytes * 100 / pi.totalBytes,
- pi.direction == ProgressInfo.Direction.OUT ? "sent to" : "received from",
- pi.sessionIndex,
- pi.peer });
- }
- }
-
- public void onSuccess(StreamState result)
- {
- String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily);
- logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
- Tracing.traceRepair(message);
- set(stat.withSummaries(result.createSummaries()));
- finished();
- }
-
- public void onFailure(Throwable t)
- {
- setException(t);
- finished();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
deleted file mode 100644
index 0a47f73..0000000
--- a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair;
-
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.RepairException;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.messages.SyncRequest;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.streaming.SessionSummary;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.FBUtilities;
-
-/**
- * RemoteSyncTask sends {@link SyncRequest} to remote(non-coordinator) node
- * to repair(stream) data with other replica.
- *
- * When RemoteSyncTask receives SyncComplete from remote node, task completes.
- */
-public class RemoteSyncTask extends SyncTask implements CompletableRemoteSyncTask
-{
- private static final Logger logger = LoggerFactory.getLogger(RemoteSyncTask.class);
-
- public RemoteSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, PreviewKind previewKind)
- {
- super(desc, r1, r2, previewKind);
- }
-
- @Override
- protected void startSync(List<Range<Token>> differences)
- {
- InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
- SyncRequest request = new SyncRequest(desc, local, r1.endpoint, r2.endpoint, differences, previewKind);
- String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst);
- logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
- Tracing.traceRepair(message);
- MessagingService.instance().sendOneWay(request.createMessage(), request.src);
- }
-
- public void syncComplete(boolean success, List<SessionSummary> summaries)
- {
- if (success)
- {
- set(stat.withSummaries(summaries));
- }
- else
- {
- setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", r1.endpoint, r2.endpoint)));
- }
- finished();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 48973d2..d38435b 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -64,7 +64,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
public RepairJob(RepairSession session, String columnFamily, boolean isIncremental, PreviewKind previewKind, boolean optimiseStreams)
{
this.session = session;
- this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.getRanges());
+ this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.commonRange.ranges);
this.taskExecutor = session.taskExecutor;
this.parallelismDegree = session.parallelismDegree;
this.isIncremental = isIncremental;
@@ -83,7 +83,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
Keyspace ks = Keyspace.open(desc.keyspace);
ColumnFamilyStore cfs = ks.getColumnFamilyStore(desc.columnFamily);
cfs.metric.repairsStarted.inc();
- List<InetAddressAndPort> allEndpoints = new ArrayList<>(session.endpoints);
+ List<InetAddressAndPort> allEndpoints = new ArrayList<>(session.commonRange.endpoints);
allEndpoints.add(FBUtilities.getBroadcastAddressAndPort());
ListenableFuture<List<TreeResponse>> validations;
@@ -160,13 +160,18 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
}, taskExecutor);
}
+ private boolean isTransient(InetAddressAndPort ep)
+ {
+ return session.commonRange.transEndpoints.contains(ep);
+ }
+
private AsyncFunction<List<TreeResponse>, List<SyncStat>> standardSyncing()
{
return trees ->
{
InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
- List<SyncTask> syncTasks = new ArrayList<>();
+ List<AbstractSyncTask> syncTasks = new ArrayList<>();
// We need to difference all trees one against another
for (int i = 0; i < trees.size() - 1; ++i)
{
@@ -174,17 +179,29 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
for (int j = i + 1; j < trees.size(); ++j)
{
TreeResponse r2 = trees.get(j);
- SyncTask task;
+
+ if (isTransient(r1.endpoint) && isTransient(r2.endpoint))
+ continue;
+
+ AbstractSyncTask task;
if (r1.endpoint.equals(local) || r2.endpoint.equals(local))
{
- task = new LocalSyncTask(desc, r1, r2, isIncremental ? desc.parentSessionId : null, session.pullRepair, session.previewKind);
+ InetAddressAndPort remote = r1.endpoint.equals(local) ? r2.endpoint : r1.endpoint;
+ task = new SymmetricLocalSyncTask(desc, r1, r2, isTransient(remote), isIncremental ? desc.parentSessionId : null, session.pullRepair, session.previewKind);
+ }
+ else if (isTransient(r1.endpoint) || isTransient(r2.endpoint))
+ {
+ TreeResponse streamFrom = isTransient(r1.endpoint) ? r1 : r2;
+ TreeResponse streamTo = isTransient(r1.endpoint) ? r2: r1;
+ task = new AsymmetricRemoteSyncTask(desc, streamTo, streamFrom, previewKind);
+ session.waitForSync(Pair.create(desc, new NodePair(streamTo.endpoint, streamFrom.endpoint)), (AsymmetricRemoteSyncTask) task);
}
else
{
- task = new RemoteSyncTask(desc, r1, r2, session.previewKind);
- // RemoteSyncTask expects SyncComplete message sent back.
+ task = new SymmetricRemoteSyncTask(desc, r1, r2, session.previewKind);
+ // SymmetricRemoteSyncTask expects SyncComplete message sent back.
// Register task to RepairSession to receive response.
- session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (RemoteSyncTask) task);
+ session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (SymmetricRemoteSyncTask) task);
}
syncTasks.add(task);
taskExecutor.submit(task);
@@ -200,7 +217,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
{
InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
- List<AsymmetricSyncTask> syncTasks = new ArrayList<>();
+ List<AbstractSyncTask> syncTasks = new ArrayList<>();
// We need to difference all trees one against another
DifferenceHolder diffHolder = new DifferenceHolder(trees);
@@ -215,6 +232,11 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
for (int i = 0; i < trees.size(); i++)
{
InetAddressAndPort address = trees.get(i).endpoint;
+
+ // we don't stream to transient replicas
+ if (isTransient(address))
+ continue;
+
HostDifferences streamsFor = reducedDifferences.get(address);
if (streamsFor != null)
{
@@ -373,4 +395,4 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
}
return Futures.allAsList(tasks);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 90c0146..8d3cd54 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair;
import java.io.IOException;
-import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ExecutorService;
@@ -29,8 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -38,8 +36,9 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.*;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.Replica;
import org.apache.commons.lang3.time.DurationFormatUtils;
-import org.junit.internal.runners.statements.Fail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,7 +73,6 @@ import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.progress.ProgressEvent;
@@ -141,46 +139,6 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
recordFailure(message, completionMessage);
}
- @VisibleForTesting
- static class CommonRange
- {
- public final Set<InetAddressAndPort> endpoints;
- public final Collection<Range<Token>> ranges;
-
- public CommonRange(Set<InetAddressAndPort> endpoints, Collection<Range<Token>> ranges)
- {
- Preconditions.checkArgument(endpoints != null && !endpoints.isEmpty());
- Preconditions.checkArgument(ranges != null && !ranges.isEmpty());
- this.endpoints = endpoints;
- this.ranges = ranges;
- }
-
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- CommonRange that = (CommonRange) o;
-
- if (!endpoints.equals(that.endpoints)) return false;
- return ranges.equals(that.ranges);
- }
-
- public int hashCode()
- {
- int result = endpoints.hashCode();
- result = 31 * result + ranges.hashCode();
- return result;
- }
-
- public String toString()
- {
- return "CommonRange{" +
- "endpoints=" + endpoints +
- ", ranges=" + ranges +
- '}';
- }
- }
protected void runMayThrow() throws Exception
{
@@ -238,20 +196,24 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
Set<InetAddressAndPort> allNeighbors = new HashSet<>();
List<CommonRange> commonRanges = new ArrayList<>();
- //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent
+ //pre-calculate output of getLocalReplicas and pass it to getNeighbors to increase performance and prevent
//calculation multiple times
- Collection<Range<Token>> keyspaceLocalRanges = storageService.getLocalRanges(keyspace);
+ // we arbitrarily limit ourselves to only full replicas, in lieu of ensuring it is safe to coordinate from a transient replica
+ Iterable<Range<Token>> keyspaceLocalRanges = storageService
+ .getLocalReplicas(keyspace)
+ .filter(Replica::isFull)
+ .ranges();
try
{
for (Range<Token> range : options.getRanges())
{
- Set<InetAddressAndPort> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range,
- options.getDataCenters(),
- options.getHosts());
+ EndpointsForRange neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range,
+ options.getDataCenters(),
+ options.getHosts());
addRangeToNeighbors(commonRanges, range, neighbors);
- allNeighbors.addAll(neighbors);
+ allNeighbors.addAll(neighbors.endpoints());
}
progress.incrementAndGet();
@@ -387,11 +349,13 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
for (CommonRange commonRange: commonRanges)
{
Set<InetAddressAndPort> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, liveEndpoints::contains));
+ Set<InetAddressAndPort> transEndpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.transEndpoints, liveEndpoints::contains));
+ Preconditions.checkState(endpoints.containsAll(transEndpoints), "transEndpoints must be a subset of endpoints");
// this node is implicitly a participant in this repair, so a single endpoint is ok here
if (!endpoints.isEmpty())
{
- filtered.add(new CommonRange(endpoints, commonRange.ranges));
+ filtered.add(new CommonRange(endpoints, transEndpoints, commonRange.ranges));
}
}
Preconditions.checkState(!filtered.isEmpty(), "Not enough live endpoints for a repair");
@@ -514,14 +478,13 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
// we do endpoint filtering at the start of an incremental repair,
// so repair sessions shouldn't also be checking liveness
boolean force = options.isForcedRepair() && !isIncremental;
- for (CommonRange cr : commonRanges)
+ for (CommonRange commonRange : commonRanges)
{
- logger.info("Starting RepairSession for {}", cr);
+ logger.info("Starting RepairSession for {}", commonRange);
RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
- cr.ranges,
+ commonRange,
keyspace,
options.getParallelism(),
- cr.endpoints,
isIncremental,
options.isPullRepair(),
force,
@@ -559,7 +522,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
public void onSuccess(RepairSessionResult result)
{
String message = String.format("Repair session %s for range %s finished", session.getId(),
- session.getRanges().toString());
+ session.ranges().toString());
logger.info(message);
fireProgressEvent(new ProgressEvent(ProgressEventType.PROGRESS,
progress.incrementAndGet(),
@@ -572,7 +535,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
StorageMetrics.repairExceptions.inc();
String message = String.format("Repair session %s for range %s failed with error %s",
- session.getId(), session.getRanges().toString(), t.getMessage());
+ session.getId(), session.ranges().toString(), t.getMessage());
logger.error(message, t);
fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR,
progress.incrementAndGet(),
@@ -684,13 +647,15 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
ImmutableList.of(failureMessage, completionMessage));
}
- private void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, Set<InetAddressAndPort> neighbors)
+ private void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors)
{
+ Set<InetAddressAndPort> endpoints = neighbors.endpoints();
+ Set<InetAddressAndPort> transEndpoints = neighbors.filter(Replica::isTransient).endpoints();
for (int i = 0; i < neighborRangeList.size(); i++)
{
CommonRange cr = neighborRangeList.get(i);
- if (cr.endpoints.containsAll(neighbors))
+ if (cr.endpoints.containsAll(endpoints) && cr.transEndpoints.containsAll(transEndpoints))
{
cr.ranges.add(range);
return;
@@ -699,7 +664,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
List<Range<Token>> ranges = new ArrayList<>();
ranges.add(range);
- neighborRangeList.add(new CommonRange(neighbors, ranges));
+ neighborRangeList.add(new CommonRange(endpoints, transEndpoints, ranges));
}
private Thread createQueryThread(final int cmd, final UUID sessionId)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index ec06f37..2ff60ec 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -55,8 +55,8 @@ import org.apache.cassandra.utils.Pair;
* validationComplete()).
* </li>
* <li>Synchronization phase: once all trees are received, the job compares each tree with
- * all the other using a so-called {@link SyncTask}. If there is difference between 2 trees, the
- * concerned SyncTask will start a streaming of the difference between the 2 endpoint concerned.
+ * all the other using a so-called {@link SymmetricSyncTask}. If there is difference between 2 trees, the
+ * concerned SymmetricSyncTask will start a streaming of the difference between the 2 endpoint concerned.
* </li>
* </ol>
* The job is done once all its SyncTasks are done (i.e. have either computed no differences
@@ -74,7 +74,7 @@ import org.apache.cassandra.utils.Pair;
* we still first send a message to each node to flush and snapshot data so each merkle tree
* creation is still done on similar data, even if the actual creation is not
* done simulatneously). If not sequential, all merkle tree are requested in parallel.
- * Similarly, if a job is sequential, it will handle one SyncTask at a time, but will handle
+ * Similarly, if a job is sequential, it will handle one SymmetricSyncTask at a time, but will handle
* all of them in parallel otherwise.
*/
public class RepairSession extends AbstractFuture<RepairSessionResult> implements IEndpointStateChangeSubscriber,
@@ -94,8 +94,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
public final boolean skippedReplicas;
/** Range to repair */
- public final Collection<Range<Token>> ranges;
- public final Set<InetAddressAndPort> endpoints;
+ public final CommonRange commonRange;
public final boolean isIncremental;
public final PreviewKind previewKind;
@@ -114,23 +113,20 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
/**
* Create new repair session.
- *
* @param parentRepairSession the parent sessions id
* @param id this sessions id
- * @param ranges ranges to repair
+ * @param commonRange ranges to repair
* @param keyspace name of keyspace
* @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees
- * @param endpoints the data centers that should be part of the repair; null for all DCs
* @param pullRepair true if the repair should be one way (from remote host to this host and only applicable between two hosts--see RepairOption)
* @param force true if the repair should ignore dead endpoints (instead of failing)
* @param cfnames names of columnfamilies
*/
public RepairSession(UUID parentRepairSession,
UUID id,
- Collection<Range<Token>> ranges,
+ CommonRange commonRange,
String keyspace,
RepairParallelism parallelismDegree,
- Set<InetAddressAndPort> endpoints,
boolean isIncremental,
boolean pullRepair,
boolean force,
@@ -145,7 +141,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
this.parallelismDegree = parallelismDegree;
this.keyspace = keyspace;
this.cfnames = cfnames;
- this.ranges = ranges;
//If force then filter out dead endpoints
boolean forceSkippedReplicas = false;
@@ -153,7 +148,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
{
logger.debug("force flag set, removing dead endpoints");
final Set<InetAddressAndPort> removeCandidates = new HashSet<>();
- for (final InetAddressAndPort endpoint : endpoints)
+ for (final InetAddressAndPort endpoint : commonRange.endpoints)
{
if (!FailureDetector.instance.isAlive(endpoint))
{
@@ -166,12 +161,13 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
// we shouldn't be recording a successful repair if
// any replicas are excluded from the repair
forceSkippedReplicas = true;
- endpoints = new HashSet<>(endpoints);
- endpoints.removeAll(removeCandidates);
+ Set<InetAddressAndPort> filteredEndpoints = new HashSet<>(commonRange.endpoints);
+ filteredEndpoints.removeAll(removeCandidates);
+ commonRange = new CommonRange(filteredEndpoints, commonRange.transEndpoints, commonRange.ranges);
}
}
- this.endpoints = endpoints;
+ this.commonRange = commonRange;
this.isIncremental = isIncremental;
this.previewKind = previewKind;
this.pullRepair = pullRepair;
@@ -184,9 +180,14 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
return id;
}
- public Collection<Range<Token>> getRanges()
+ public Collection<Range<Token>> ranges()
+ {
+ return commonRange.ranges;
+ }
+
+ public Collection<InetAddressAndPort> endpoints()
{
- return ranges;
+ return commonRange.endpoints;
}
public void waitForValidation(Pair<RepairJobDesc, InetAddressAndPort> key, ValidationTask task)
@@ -247,7 +248,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
{
StringBuilder sb = new StringBuilder();
sb.append(FBUtilities.getBroadcastAddressAndPort());
- for (InetAddressAndPort ep : endpoints)
+ for (InetAddressAndPort ep : commonRange.endpoints)
sb.append(", ").append(ep);
return sb.toString();
}
@@ -266,18 +267,18 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
if (terminated)
return;
- logger.info("{} new session: will sync {} on range {} for {}.{}", previewKind.logPrefix(getId()), repairedNodes(), ranges, keyspace, Arrays.toString(cfnames));
- Tracing.traceRepair("Syncing range {}", ranges);
+ logger.info("{} new session: will sync {} on range {} for {}.{}", previewKind.logPrefix(getId()), repairedNodes(), commonRange, keyspace, Arrays.toString(cfnames));
+ Tracing.traceRepair("Syncing range {}", commonRange);
if (!previewKind.isPreview())
{
- SystemDistributedKeyspace.startRepairs(getId(), parentRepairSession, keyspace, cfnames, ranges, endpoints);
+ SystemDistributedKeyspace.startRepairs(getId(), parentRepairSession, keyspace, cfnames, commonRange);
}
- if (endpoints.isEmpty())
+ if (commonRange.endpoints.isEmpty())
{
- logger.info("{} {}", previewKind.logPrefix(getId()), message = String.format("No neighbors to repair with on range %s: session completed", ranges));
+ logger.info("{} {}", previewKind.logPrefix(getId()), message = String.format("No neighbors to repair with on range %s: session completed", commonRange));
Tracing.traceRepair(message);
- set(new RepairSessionResult(id, keyspace, ranges, Lists.<RepairResult>newArrayList(), skippedReplicas));
+ set(new RepairSessionResult(id, keyspace, commonRange.ranges, Lists.<RepairResult>newArrayList(), skippedReplicas));
if (!previewKind.isPreview())
{
SystemDistributedKeyspace.failRepairs(getId(), keyspace, cfnames, new RuntimeException(message));
@@ -286,7 +287,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
}
// Checking all nodes are live
- for (InetAddressAndPort endpoint : endpoints)
+ for (InetAddressAndPort endpoint : commonRange.endpoints)
{
if (!FailureDetector.instance.isAlive(endpoint) && !skippedReplicas)
{
@@ -318,8 +319,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
{
// this repair session is completed
logger.info("{} {}", previewKind.logPrefix(getId()), "Session completed successfully");
- Tracing.traceRepair("Completed sync of range {}", ranges);
- set(new RepairSessionResult(id, keyspace, ranges, results, skippedReplicas));
+ Tracing.traceRepair("Completed sync of range {}", commonRange);
+ set(new RepairSessionResult(id, keyspace, commonRange.ranges, results, skippedReplicas));
taskExecutor.shutdown();
// mark this session as terminated
@@ -372,7 +373,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
public void convict(InetAddressAndPort endpoint, double phi)
{
- if (!endpoints.contains(endpoint))
+ if (!commonRange.endpoints.contains(endpoint))
return;
// We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 5d2b2ec..e9cba89 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.Collection;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.locator.RangesAtEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,9 +79,12 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
StreamPlan sp = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind)
.listeners(this)
.flushBeforeTransfer(pendingRepair == null) // sstables are isolated at the beginning of an incremental repair session, so flushing isn't neccessary
- .requestRanges(dest, desc.keyspace, ranges, desc.columnFamily); // request ranges from the remote node
+ // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here
+ .requestRanges(dest, desc.keyspace, RangesAtEndpoint.toDummyList(ranges),
+ RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily); // request ranges from the remote node
if (!asymmetric)
- sp.transferRanges(dest, desc.keyspace, ranges, desc.columnFamily); // send ranges to the remote node
+ // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here
+ sp.transferRanges(dest, desc.keyspace, RangesAtEndpoint.toDummyList(ranges), desc.columnFamily); // send ranges to the remote node
return sp;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org