You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/01/16 18:48:20 UTC
[1/2] git commit: Coverage analysis for low CL queries
Coverage analysis for low CL queries
patch by slebresne; reviewed by jbellis for CASSANDRA-4858
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/52671128
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/52671128
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/52671128
Branch: refs/heads/trunk
Commit: 526711288c30a6824a8e29805af7a2b1d72b9129
Parents: 994d262
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jan 16 18:43:20 2013 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 16 18:45:32 2013 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/config/CFMetaData.java | 12 +
.../cassandra/config/DatabaseDescriptor.java | 19 ++
.../cassandra/config/ReadRepairDecision.java | 23 ++
.../org/apache/cassandra/db/ConsistencyLevel.java | 174 ++++++++++++++-
.../org/apache/cassandra/dht/AbstractBounds.java | 2 +
src/java/org/apache/cassandra/dht/Bounds.java | 5 +
.../org/apache/cassandra/dht/ExcludingBounds.java | 5 +
.../cassandra/dht/IncludingExcludingBounds.java | 5 +
src/java/org/apache/cassandra/dht/Range.java | 5 +
.../cassandra/locator/AbstractEndpointSnitch.java | 24 ++
.../locator/AbstractReplicationStrategy.java | 28 ++-
.../cassandra/locator/DynamicEndpointSnitch.java | 30 +++
.../apache/cassandra/locator/IEndpointSnitch.java | 8 +-
.../apache/cassandra/locator/LocalStrategy.java | 1 +
.../cassandra/locator/NetworkTopologyStrategy.java | 1 +
.../locator/OldNetworkTopologyStrategy.java | 1 +
.../apache/cassandra/locator/SimpleStrategy.java | 1 +
.../service/AbstractWriteResponseHandler.java | 38 ++-
.../cassandra/service/DatacenterReadCallback.java | 104 ---------
.../DatacenterSyncWriteResponseHandler.java | 45 +----
.../service/DatacenterWriteResponseHandler.java | 28 +--
.../org/apache/cassandra/service/ReadCallback.java | 93 ++-------
.../org/apache/cassandra/service/StorageProxy.java | 121 ++++++++---
.../apache/cassandra/service/StorageService.java | 6 +-
.../cassandra/service/WriteResponseHandler.java | 45 +---
.../ReplicationStrategyEndpointCacheTest.java | 2 +-
.../cassandra/locator/SimpleStrategyTest.java | 10 -
28 files changed, 481 insertions(+), 356 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3e40b6..49b86b9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -37,6 +37,7 @@
* Fix inserting empty maps (CASSANDRA-5141)
* Don't remove tokens from System table for node we know (CASSANDRA-5121)
* fix streaming progress report for compresed files (CASSANDRA-5130)
+ * Coverage analysis for low-CL queries (CASSANDRA-4858)
Merged from 1.1:
* Simplify CompressedRandomAccessReader to work around JDK FD bug (CASSANDRA-5088)
* Improve handling a changing target throttle rate mid-compaction (CASSANDRA-5087)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 5f6c1fe..82d49a9 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -465,6 +465,18 @@ public final class CFMetaData
return dcLocalReadRepairChance;
}
+ public ReadRepairDecision newReadRepairDecision()
+ {
+ double chance = FBUtilities.threadLocalRandom().nextDouble();
+ if (getReadRepairChance() > chance)
+ return ReadRepairDecision.GLOBAL;
+
+ if (getDcLocalReadRepair() > chance)
+ return ReadRepairDecision.DC_LOCAL;
+
+ return ReadRepairDecision.NONE;
+ }
+
public boolean getReplicateOnWrite()
{
return replicateOnWrite;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index c9b2d1e..7c33203 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -88,6 +88,7 @@ public class DatabaseDescriptor
private static IRowCacheProvider rowCacheProvider;
private static String localDC;
+ private static Comparator<InetAddress> localComparator;
/**
* Inspect the classpath to find storage configuration file
@@ -338,6 +339,19 @@ public class DatabaseDescriptor
EndpointSnitchInfo.create();
localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
+ localComparator = new Comparator<InetAddress>()
+ {
+ public int compare(InetAddress endpoint1, InetAddress endpoint2)
+ {
+ boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
+ boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
+ if (local1 && !local2)
+ return -1;
+ if (local2 && !local1)
+ return 1;
+ return 0;
+ }
+ };
/* Request Scheduler setup */
requestSchedulerOptions = conf.request_scheduler_options;
@@ -1252,6 +1266,11 @@ public class DatabaseDescriptor
return localDC;
}
+ public static Comparator<InetAddress> getLocalComparator()
+ {
+ return localComparator;
+ }
+
public static Config.InternodeCompression internodeCompression()
{
return conf.internode_compression;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/config/ReadRepairDecision.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ReadRepairDecision.java b/src/java/org/apache/cassandra/config/ReadRepairDecision.java
new file mode 100644
index 0000000..1b4b648
--- /dev/null
+++ b/src/java/org/apache/cassandra/config/ReadRepairDecision.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.config;
+
+public enum ReadRepairDecision
+{
+ NONE, GLOBAL, DC_LOCAL;
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 53acc5a..56a7896 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -17,13 +17,27 @@
*/
package org.apache.cassandra.db;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ReadRepairDecision;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.transport.ProtocolException;
+
public enum ConsistencyLevel
{
ANY (0),
@@ -35,6 +49,8 @@ public enum ConsistencyLevel
LOCAL_QUORUM(6),
EACH_QUORUM (7);
+ private static final Logger logger = LoggerFactory.getLogger(ConsistencyLevel.class);
+
// Used by the binary protocol
public final int code;
private static final ConsistencyLevel[] codeIdx;
@@ -64,9 +80,13 @@ public enum ConsistencyLevel
return codeIdx[code];
}
- public int blockFor(String table)
+ private int localQuorumFor(Table table, String dc)
+ {
+ return (((NetworkTopologyStrategy) table.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1;
+ }
+
+ public int blockFor(Table table)
{
- NetworkTopologyStrategy strategy = null;
switch (this)
{
case ONE:
@@ -78,23 +98,161 @@ public enum ConsistencyLevel
case THREE:
return 3;
case QUORUM:
- return (Table.open(table).getReplicationStrategy().getReplicationFactor() / 2) + 1;
+ return (table.getReplicationStrategy().getReplicationFactor() / 2) + 1;
case ALL:
- return Table.open(table).getReplicationStrategy().getReplicationFactor();
+ return table.getReplicationStrategy().getReplicationFactor();
case LOCAL_QUORUM:
- strategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy();
- return (strategy.getReplicationFactor(DatabaseDescriptor.getLocalDataCenter()) / 2) + 1;
+ return localQuorumFor(table, DatabaseDescriptor.getLocalDataCenter());
case EACH_QUORUM:
- strategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy();
+ NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) table.getReplicationStrategy();
int n = 0;
for (String dc : strategy.getDatacenters())
- n += (strategy.getReplicationFactor(dc) / 2) + 1;
+ n += localQuorumFor(table, dc);
return n;
default:
throw new UnsupportedOperationException("Invalid consistency level: " + toString());
}
}
+ private boolean isLocal(InetAddress endpoint)
+ {
+ return DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint));
+ }
+
+ private int countLocalEndpoints(Iterable<InetAddress> liveEndpoints)
+ {
+ int count = 0;
+ for (InetAddress endpoint : liveEndpoints)
+ if (isLocal(endpoint))
+ count++;
+ return count;
+ }
+
+ private Map<String, Integer> countPerDCEndpoints(Table table, Iterable<InetAddress> liveEndpoints)
+ {
+ NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) table.getReplicationStrategy();
+
+ Map<String, Integer> dcEndpoints = new HashMap<String, Integer>();
+ for (String dc: strategy.getDatacenters())
+ dcEndpoints.put(dc, 0);
+
+ for (InetAddress endpoint : liveEndpoints)
+ {
+ String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint);
+ dcEndpoints.put(dc, dcEndpoints.get(dc) + 1);
+ }
+ return dcEndpoints;
+ }
+
+ public List<InetAddress> filterForQuery(Table table, List<InetAddress> liveEndpoints)
+ {
+ return filterForQuery(table, liveEndpoints, ReadRepairDecision.NONE);
+ }
+
+ public List<InetAddress> filterForQuery(Table table, List<InetAddress> liveEndpoints, ReadRepairDecision readRepair)
+ {
+ /*
+ * Endpoints are expected to be restricted to live replicas, sorted by snitch preference.
+ * For LOCAL_QORUM, move local-DC replicas in front first as we need them there whether
+ * we do read repair (since the first replica gets the data read) or not (since we'll take
+ * the blockFor first ones).
+ */
+ if (this == LOCAL_QUORUM)
+ Collections.sort(liveEndpoints, DatabaseDescriptor.getLocalComparator());
+
+ switch (readRepair)
+ {
+ case NONE:
+ return liveEndpoints.subList(0, Math.min(liveEndpoints.size(), blockFor(table)));
+ case GLOBAL:
+ return liveEndpoints;
+ case DC_LOCAL:
+ List<InetAddress> local = new ArrayList<InetAddress>();
+ List<InetAddress> other = new ArrayList<InetAddress>();
+ for (InetAddress add : liveEndpoints)
+ {
+ if (isLocal(add))
+ local.add(add);
+ else
+ other.add(add);
+ }
+ // check if blockfor more than we have localep's
+ int blockFor = blockFor(table);
+ if (local.size() < blockFor)
+ local.addAll(other.subList(0, Math.min(blockFor - local.size(), other.size())));
+ return local;
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ public boolean isSufficientLiveNodes(Table table, Iterable<InetAddress> liveEndpoints)
+ {
+ switch (this)
+ {
+ case ANY:
+ // local hint is acceptable, and local node is always live
+ return true;
+ case LOCAL_QUORUM:
+ return countLocalEndpoints(liveEndpoints) >= blockFor(table);
+ case EACH_QUORUM:
+ for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
+ {
+ if (entry.getValue() < localQuorumFor(table, entry.getKey()))
+ return false;
+ }
+ return true;
+ default:
+ return Iterables.size(liveEndpoints) >= blockFor(table);
+ }
+ }
+
+ public void assureSufficientLiveNodes(Table table, Iterable<InetAddress> liveEndpoints) throws UnavailableException
+ {
+ int blockFor = blockFor(table);
+ switch (this)
+ {
+ case ANY:
+ // local hint is acceptable, and local node is always live
+ break;
+ case LOCAL_QUORUM:
+ int localLive = countLocalEndpoints(liveEndpoints);
+ if (localLive < blockFor)
+ {
+ if (logger.isDebugEnabled())
+ {
+ StringBuilder builder = new StringBuilder("Local replicas [");
+ for (InetAddress endpoint : liveEndpoints)
+ {
+ if (isLocal(endpoint))
+ builder.append(endpoint).append(",");
+ }
+ builder.append("] are insufficient to satisfy LOCAL_QUORUM requirement of ").append(blockFor).append(" live nodes in '").append(DatabaseDescriptor.getLocalDataCenter()).append("'");
+ logger.debug(builder.toString());
+ }
+ throw new UnavailableException(this, blockFor, localLive);
+ }
+ break;
+ case EACH_QUORUM:
+ for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
+ {
+ int dcBlockFor = localQuorumFor(table, entry.getKey());
+ int dcLive = entry.getValue();
+ if (dcLive < dcBlockFor)
+ throw new UnavailableException(this, dcBlockFor, dcLive);
+ }
+ break;
+ default:
+ int live = Iterables.size(liveEndpoints);
+ if (live < blockFor)
+ {
+ logger.debug("Live nodes {} do not satisfy ConsistencyLevel ({} required)", Iterables.toString(liveEndpoints), blockFor);
+ throw new UnavailableException(this, blockFor, live);
+ }
+ break;
+ }
+ }
+
public void validateForRead(String table) throws InvalidRequestException
{
switch (this)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/dht/AbstractBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/AbstractBounds.java b/src/java/org/apache/cassandra/dht/AbstractBounds.java
index b5ce85d..efa886c 100644
--- a/src/java/org/apache/cassandra/dht/AbstractBounds.java
+++ b/src/java/org/apache/cassandra/dht/AbstractBounds.java
@@ -123,6 +123,8 @@ public abstract class AbstractBounds<T extends RingPosition> implements Serializ
*/
public abstract AbstractBounds<Token> toTokenBounds();
+ public abstract AbstractBounds<T> withNewRight(T newRight);
+
public static class AbstractBoundsSerializer implements IVersionedSerializer<AbstractBounds<?>>
{
public void serialize(AbstractBounds<?> range, DataOutput out, int version) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/dht/Bounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Bounds.java b/src/java/org/apache/cassandra/dht/Bounds.java
index bf579ae..fedc407 100644
--- a/src/java/org/apache/cassandra/dht/Bounds.java
+++ b/src/java/org/apache/cassandra/dht/Bounds.java
@@ -115,4 +115,9 @@ public class Bounds<T extends RingPosition> extends AbstractBounds<T>
{
return (left instanceof RowPosition) ? new Bounds<Token>(((RowPosition)left).getToken(), ((RowPosition)right).getToken(), partitioner) : (Bounds<Token>)this;
}
+
+ public AbstractBounds<T> withNewRight(T newRight)
+ {
+ return new Bounds<T>(left, newRight);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/dht/ExcludingBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/ExcludingBounds.java b/src/java/org/apache/cassandra/dht/ExcludingBounds.java
index 7823023..5f46015 100644
--- a/src/java/org/apache/cassandra/dht/ExcludingBounds.java
+++ b/src/java/org/apache/cassandra/dht/ExcludingBounds.java
@@ -104,4 +104,9 @@ public class ExcludingBounds<T extends RingPosition> extends AbstractBounds<T>
{
return (left instanceof RowPosition) ? new ExcludingBounds<Token>(((RowPosition)left).getToken(), ((RowPosition)right).getToken(), partitioner) : (ExcludingBounds<Token>)this;
}
+
+ public AbstractBounds<T> withNewRight(T newRight)
+ {
+ return new ExcludingBounds<T>(left, newRight);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java b/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
index 0b1cb0b..561555b 100644
--- a/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
+++ b/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
@@ -103,4 +103,9 @@ public class IncludingExcludingBounds<T extends RingPosition> extends AbstractBo
{
return (left instanceof RowPosition) ? new IncludingExcludingBounds<Token>(((RowPosition)left).getToken(), ((RowPosition)right).getToken(), partitioner) : (IncludingExcludingBounds<Token>)this;
}
+
+ public AbstractBounds<T> withNewRight(T newRight)
+ {
+ return new IncludingExcludingBounds<T>(left, newRight);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/dht/Range.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java
index a750ae2..99286fe 100644
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@ -468,4 +468,9 @@ public class Range<T extends RingPosition> extends AbstractBounds<T> implements
{
return (left instanceof RowPosition) ? new Range<Token>(((RowPosition)left).getToken(), ((RowPosition)right).getToken(), partitioner) : (Range<Token>)this;
}
+
+ public AbstractBounds<T> withNewRight(T newRight)
+ {
+ return new Range<T>(left, newRight);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java b/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
index 0976cc8..546d15e 100644
--- a/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.locator;
import java.net.InetAddress;
import java.util.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
+
public abstract class AbstractEndpointSnitch implements IEndpointSnitch
{
public abstract int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2);
@@ -57,4 +59,26 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch
{
// noop by default
}
+
+ public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2)
+ {
+ // Querying remote DC is likely to be an order of magnitude slower than
+ // querying locally, so 2 queries to local nodes is likely to still be
+ // faster than 1 query involving remote ones
+ boolean mergedHasRemote = hasRemoteNode(merged);
+ return mergedHasRemote
+ ? hasRemoteNode(l1) || hasRemoteNode(l2)
+ : true;
+ }
+
+ private boolean hasRemoteNode(List<InetAddress> l)
+ {
+ String localDc = DatabaseDescriptor.getLocalDataCenter();
+ for (InetAddress ep : l)
+ {
+ if (!localDc.equals(getDatacenter(ep)))
+ return true;
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index 3c30baa..4b54d94 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -21,12 +21,14 @@ import java.lang.reflect.Constructor;
import java.net.InetAddress;
import java.util.*;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.RingPosition;
@@ -46,22 +48,25 @@ public abstract class AbstractReplicationStrategy
{
private static final Logger logger = LoggerFactory.getLogger(AbstractReplicationStrategy.class);
- public final String table;
+ @VisibleForTesting
+ final String tableName;
+ private Table table;
public final Map<String, String> configOptions;
private final TokenMetadata tokenMetadata;
public IEndpointSnitch snitch;
- AbstractReplicationStrategy(String table, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
+ AbstractReplicationStrategy(String tableName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
{
- assert table != null;
+ assert tableName != null;
assert snitch != null;
assert tokenMetadata != null;
this.tokenMetadata = tokenMetadata;
this.snitch = snitch;
this.tokenMetadata.register(this);
this.configOptions = configOptions == null ? Collections.<String, String>emptyMap() : configOptions;
- this.table = table;
+ this.tableName = tableName;
+ // lazy-initialize table itself since we don't create them until after the replication strategies
}
private final Map<Token, ArrayList<InetAddress>> cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddress>>();
@@ -120,13 +125,20 @@ public abstract class AbstractReplicationStrategy
if (consistency_level == ConsistencyLevel.LOCAL_QUORUM)
{
// block for in this context will be localnodes block.
- return new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, table, callback, writeType);
+ return new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
}
else if (consistency_level == ConsistencyLevel.EACH_QUORUM)
{
- return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, table, callback, writeType);
+ return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
}
- return new WriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, table, callback, writeType);
+ return new WriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
+ }
+
+ private Table getTable()
+ {
+ if (table == null)
+ table = Table.open(tableName);
+ return table;
}
/**
@@ -266,7 +278,7 @@ public abstract class AbstractReplicationStrategy
for (String key : configOptions.keySet())
{
if (!expectedOptions.contains(key))
- throw new ConfigurationException(String.format("Unrecognized strategy option {%s} passed to %s for keyspace %s", key, getClass().getSimpleName(), table));
+ throw new ConfigurationException(String.format("Unrecognized strategy option {%s} passed to %s for keyspace %s", key, getClass().getSimpleName(), tableName));
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index 6d03391..40aad20 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -306,4 +306,34 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
return StorageService.instance.getSeverity(FBUtilities.getBroadcastAddress());
}
+ public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2)
+ {
+ if (!subsnitch.isWorthMergingForRangeQuery(merged, l1, l2))
+ return false;
+
+ // Make sure we return the subsnitch decision (i.e true if we're here) if we lack too much scores
+ double maxMerged = maxScore(merged);
+ double maxL1 = maxScore(l1);
+ double maxL2 = maxScore(l2);
+ if (maxMerged < 0 || maxL1 < 0 || maxL2 < 0)
+ return true;
+
+ return maxMerged < maxL1 + maxL2;
+ }
+
+ // Return the max score for the endpoint in the provided list, or -1.0 if no node have a score.
+ private double maxScore(List<InetAddress> endpoints)
+ {
+ double maxScore = -1.0;
+ for (InetAddress endpoint : endpoints)
+ {
+ Double score = scores.get(endpoint);
+ if (score == null)
+ continue;
+
+ if (score > maxScore)
+ maxScore = score;
+ }
+ return maxScore;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
index 5767d1e..690b84f 100644
--- a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
@@ -58,4 +58,10 @@ public interface IEndpointSnitch
* called after Gossiper instance exists immediately before it starts gossiping
*/
public void gossiperStarting();
-}
\ No newline at end of file
+
+ /**
+ * Returns whether for a range query doing a query against merged is likely
+ * to be faster than 2 sequential queries, one against l1 followed by one against l2.
+ */
+ public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/locator/LocalStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/LocalStrategy.java b/src/java/org/apache/cassandra/locator/LocalStrategy.java
index 4aafa01..ab580f1 100644
--- a/src/java/org/apache/cassandra/locator/LocalStrategy.java
+++ b/src/java/org/apache/cassandra/locator/LocalStrategy.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.dht.RingPosition;
import org.apache.cassandra.dht.Token;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index 6291da0..ad43197 100644
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@ -24,6 +24,7 @@ import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.TokenMetadata.Topology;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
index f07328b..2e3e3e8 100644
--- a/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.dht.Token;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/locator/SimpleStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
index 5d8a723..17d171e 100644
--- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java
+++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.dht.Token;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 2ad2849..c792819 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -21,17 +21,31 @@ import java.net.InetAddress;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.utils.SimpleCondition;
public abstract class AbstractWriteResponseHandler implements IAsyncCallback
{
+ private static Predicate<InetAddress> isAlive = new Predicate<InetAddress>()
+ {
+ public boolean apply(InetAddress endpoint)
+ {
+ return FailureDetector.instance.isAlive(endpoint);
+ }
+ };
+
private final SimpleCondition condition = new SimpleCondition();
+ protected final Table table;
protected final long startTime;
protected final Collection<InetAddress> naturalEndpoints;
protected final ConsistencyLevel consistencyLevel;
@@ -43,14 +57,16 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
* @param pendingEndpoints
* @param callback A callback to be called when the write is successful.
*/
- protected AbstractWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
+ protected AbstractWriteResponseHandler(Table table,
+ Collection<InetAddress> naturalEndpoints,
Collection<InetAddress> pendingEndpoints,
ConsistencyLevel consistencyLevel,
Runnable callback,
WriteType writeType)
{
+ this.table = table;
this.pendingEndpoints = pendingEndpoints;
- startTime = System.currentTimeMillis();
+ this.startTime = System.currentTimeMillis();
this.consistencyLevel = consistencyLevel;
this.naturalEndpoints = naturalEndpoints;
this.callback = callback;
@@ -72,22 +88,18 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
}
if (!success)
- throw new WriteTimeoutException(writeType, consistencyLevel, ackCount(), blockFor());
+ throw new WriteTimeoutException(writeType, consistencyLevel, ackCount(), consistencyLevel.blockFor(table) + pendingEndpoints.size());
}
protected abstract int ackCount();
- protected int blockFor()
- {
- return blockForCL() + pendingEndpoints.size();
- }
-
- protected abstract int blockForCL();
-
/** null message means "response from local write" */
public abstract void response(MessageIn msg);
- public abstract void assureSufficientLiveNodes() throws UnavailableException;
+ public void assureSufficientLiveNodes() throws UnavailableException
+ {
+ consistencyLevel.assureSufficientLiveNodes(table, Iterables.filter(Iterables.concat(naturalEndpoints, pendingEndpoints), isAlive));
+ }
protected void signal()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
deleted file mode 100644
index e1ae652..0000000
--- a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.net.InetAddress;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.exceptions.UnavailableException;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.thrift.protocol.TMessage;
-
-/**
- * Datacenter Quorum response handler blocks for a quorum of responses from the local DC
- */
-public class DatacenterReadCallback<TMessage, TResolved> extends ReadCallback<TMessage, TResolved>
-{
- private static final Comparator<InetAddress> localComparator = new Comparator<InetAddress>()
- {
- public int compare(InetAddress endpoint1, InetAddress endpoint2)
- {
- boolean local1 = localdc.equals(snitch.getDatacenter(endpoint1));
- boolean local2 = localdc.equals(snitch.getDatacenter(endpoint2));
- if (local1 && !local2)
- return -1;
- if (local2 && !local1)
- return 1;
- return 0;
- }
- };
-
- public DatacenterReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
- {
- super(resolver, consistencyLevel, command, endpoints);
- }
-
- protected DatacenterReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, int blockfor, IReadCommand command, List<InetAddress> endpoints)
- {
- super(resolver, consistencyLevel, blockfor, command, endpoints);
- }
-
- @Override
- public ReadCallback<TMessage, TResolved> withNewResolver(IResponseResolver<TMessage, TResolved> newResolver)
- {
- return new DatacenterReadCallback(newResolver, consistencyLevel, blockfor, command, endpoints);
- }
-
- @Override
- protected void sortForConsistencyLevel(List<InetAddress> endpoints)
- {
- Collections.sort(endpoints, localComparator);
- }
-
- @Override
- protected boolean waitingFor(MessageIn message)
- {
- return localdc.equals(snitch.getDatacenter(message.from));
- }
-
- @Override
- public void assureSufficientLiveNodes() throws UnavailableException
- {
- int localEndpoints = 0;
- for (InetAddress endpoint : endpoints)
- {
- if (localdc.equals(snitch.getDatacenter(endpoint)))
- localEndpoints++;
- }
-
- if (localEndpoints < blockfor)
- {
- if (logger.isDebugEnabled())
- {
- StringBuilder builder = new StringBuilder("Local replicas [");
- for (InetAddress endpoint : endpoints)
- {
- if (localdc.equals(snitch.getDatacenter(endpoint)))
- builder.append(endpoint).append(",");
- }
- builder.append("] are insufficient to satisfy LOCAL_QUORUM requirement of ").append(blockfor).append(" live nodes in '").append(localdc).append("'");
- logger.debug(builder.toString());
- }
-
- throw new UnavailableException(consistencyLevel, blockfor, localEndpoints);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index 995b5bf..55e833d 100644
--- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -43,29 +43,21 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
{
private static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- private static final String localdc;
- static
- {
- localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
- }
-
- private final String table;
private final NetworkTopologyStrategy strategy;
private final HashMap<String, AtomicInteger> responses = new HashMap<String, AtomicInteger>();
public DatacenterSyncWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
Collection<InetAddress> pendingEndpoints,
ConsistencyLevel consistencyLevel,
- String table,
+ Table table,
Runnable callback,
WriteType writeType)
{
// Response is been managed by the map so make it 1 for the superclass.
- super(naturalEndpoints, pendingEndpoints, consistencyLevel, callback, writeType);
+ super(table, naturalEndpoints, pendingEndpoints, consistencyLevel, callback, writeType);
assert consistencyLevel == ConsistencyLevel.EACH_QUORUM;
- this.table = table;
- strategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy();
+ strategy = (NetworkTopologyStrategy) table.getReplicationStrategy();
for (String dc : strategy.getDatacenters())
{
@@ -77,7 +69,7 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
public void response(MessageIn message)
{
String dataCenter = message == null
- ? localdc
+ ? DatabaseDescriptor.getLocalDataCenter()
: snitch.getDatacenter(message.from);
responses.get(dataCenter).getAndDecrement();
@@ -92,11 +84,6 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
signal();
}
- protected int blockForCL()
- {
- return consistencyLevel.blockFor(table);
- }
-
protected int ackCount()
{
int n = 0;
@@ -109,30 +96,6 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
return n;
}
- public void assureSufficientLiveNodes() throws UnavailableException
- {
- Map<String, AtomicInteger> dcEndpoints = new HashMap<String, AtomicInteger>();
- for (String dc: strategy.getDatacenters())
- dcEndpoints.put(dc, new AtomicInteger());
-
- for (InetAddress destination : Iterables.concat(naturalEndpoints, pendingEndpoints))
- {
- if (FailureDetector.instance.isAlive(destination))
- {
- // figure out the destination dc
- String destinationDC = snitch.getDatacenter(destination);
- dcEndpoints.get(destinationDC).incrementAndGet();
- }
- }
-
- // Throw exception if any of the DC doesn't have livenodes to accept write.
- for (String dc: strategy.getDatacenters())
- {
- if (dcEndpoints.get(dc).get() < responses.get(dc).get())
- throw new UnavailableException(consistencyLevel, responses.get(dc).get(), dcEndpoints.get(dc).get());
- }
- }
-
public boolean isLatencyForSnitch()
{
return false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index cc76231..3fa674f 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import com.google.common.collect.Iterables;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.locator.IEndpointSnitch;
@@ -38,16 +39,10 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler
{
private static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- private static final String localdc;
- static
- {
- localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
- }
-
public DatacenterWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
Collection<InetAddress> pendingEndpoints,
ConsistencyLevel consistencyLevel,
- String table,
+ Table table,
Runnable callback,
WriteType writeType)
{
@@ -58,27 +53,10 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler
@Override
public void response(MessageIn message)
{
- if (message == null || localdc.equals(snitch.getDatacenter(message.from)))
+ if (message == null || DatabaseDescriptor.getLocalDataCenter().equals(snitch.getDatacenter(message.from)))
{
if (responses.decrementAndGet() == 0)
signal();
}
}
-
- @Override
- public void assureSufficientLiveNodes() throws UnavailableException
- {
- int liveNodes = 0;
- for (InetAddress destination : Iterables.concat(naturalEndpoints, pendingEndpoints))
- {
- if (localdc.equals(snitch.getDatacenter(destination)) && FailureDetector.instance.isAlive(destination))
- liveNodes++;
- }
-
- if (liveNodes < responses.get())
- {
- throw new UnavailableException(consistencyLevel, responses.get(), liveNodes);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index e12859a..f1ca96e 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -34,9 +34,9 @@ import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.UnavailableException;
-import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
@@ -46,43 +46,34 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.SimpleCondition;
import org.apache.cassandra.utils.WrappedRunnable;
-import com.google.common.collect.Lists;
-
public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessage>
{
protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
- protected static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- protected static final String localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
-
public final IResponseResolver<TMessage, TResolved> resolver;
- protected final SimpleCondition condition = new SimpleCondition();
+ private final SimpleCondition condition = new SimpleCondition();
private final long startTime;
- protected final int blockfor;
+ private final int blockfor;
final List<InetAddress> endpoints;
- protected final IReadCommand command;
- protected final ConsistencyLevel consistencyLevel;
- protected final AtomicInteger received = new AtomicInteger(0);
+ private final IReadCommand command;
+ private final ConsistencyLevel consistencyLevel;
+ private final AtomicInteger received = new AtomicInteger(0);
+ private final Table table; // TODO push this into ConsistencyLevel?
/**
* Constructor when response count has to be calculated and blocked for.
*/
- public ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
+ public ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> filteredEndpoints)
{
- this.command = command;
- this.blockfor = consistencyLevel.blockFor(command.getKeyspace());
- this.resolver = resolver;
- this.startTime = System.currentTimeMillis();
- this.consistencyLevel = consistencyLevel;
- sortForConsistencyLevel(endpoints);
- this.endpoints = filterEndpoints(endpoints);
+ this(resolver, consistencyLevel, consistencyLevel.blockFor(Table.open(command.getKeyspace())), command, Table.open(command.getKeyspace()), filteredEndpoints);
if (logger.isTraceEnabled())
logger.trace(String.format("Blockfor is %s; setting up requests to %s", blockfor, StringUtils.join(this.endpoints, ",")));
}
- protected ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, int blockfor, IReadCommand command, List<InetAddress> endpoints)
+ private ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, int blockfor, IReadCommand command, Table table, List<InetAddress> endpoints)
{
this.command = command;
+ this.table = table;
this.blockfor = blockfor;
this.consistencyLevel = consistencyLevel;
this.resolver = resolver;
@@ -92,54 +83,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
public ReadCallback<TMessage, TResolved> withNewResolver(IResponseResolver<TMessage, TResolved> newResolver)
{
- return new ReadCallback(newResolver, consistencyLevel, blockfor, command, endpoints);
- }
-
- /**
- * Endpoints is already restricted to live replicas, sorted by snitch preference. This is a hook for
- * DatacenterReadCallback to move local-DC replicas to the front of the list. We need this both
- * when doing read repair (because the first replica gets the data read) and otherwise (because
- * only the first 1..blockfor replicas will get digest reads).
- */
- protected void sortForConsistencyLevel(List<InetAddress> endpoints)
- {
- // no-op except in DRC
- }
-
- private List<InetAddress> filterEndpoints(List<InetAddress> ep)
- {
- if (resolver instanceof RowDigestResolver)
- {
- assert command instanceof ReadCommand : command;
- String table = ((RowDigestResolver) resolver).table;
- String columnFamily = ((ReadCommand) command).getColumnFamilyName();
- CFMetaData cfmd = Schema.instance.getTableMetaData(table).get(columnFamily);
- double chance = FBUtilities.threadLocalRandom().nextDouble();
-
- // if global repair then just return all the ep's
- if (cfmd.getReadRepairChance() > chance)
- return ep;
-
- // if local repair then just return localDC ep's
- if (cfmd.getDcLocalReadRepair() > chance)
- {
- List<InetAddress> local = Lists.newArrayList();
- List<InetAddress> other = Lists.newArrayList();
- for (InetAddress add : ep)
- {
- if (snitch.getDatacenter(add).equals(localdc))
- local.add(add);
- else
- other.add(add);
- }
- // check if blockfor more than we have localep's
- if (local.size() < blockfor)
- local.addAll(other.subList(0, Math.min(blockfor - local.size(), other.size())));
- return local;
- }
- }
- // we don't read repair on range scans
- return ep.subList(0, Math.min(ep.size(), blockfor));
+ return new ReadCallback(newResolver, consistencyLevel, blockfor, command, table, endpoints);
}
public TResolved get() throws ReadTimeoutException, DigestMismatchException, IOException
@@ -177,9 +121,11 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
/**
* @return true if the message counts towards the blockfor threshold
*/
- protected boolean waitingFor(MessageIn message)
+ private boolean waitingFor(MessageIn message)
{
- return true;
+ return consistencyLevel == ConsistencyLevel.LOCAL_QUORUM
+ ? DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(message.from))
+ : true;
}
public void response(TMessage result)
@@ -207,12 +153,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
public void assureSufficientLiveNodes() throws UnavailableException
{
- if (endpoints.size() < blockfor)
- {
- logger.debug("Live nodes {} do not satisfy ConsistencyLevel ({} required)",
- StringUtils.join(endpoints, ", "), blockfor);
- throw new UnavailableException(consistencyLevel, blockfor, endpoints.size());
- }
+ consistencyLevel.assureSufficientLiveNodes(table, endpoints);
}
public boolean isLatencyForSnitch()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 0fb7ec4..df09171 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
@@ -310,7 +311,7 @@ public class StorageProxy implements StorageProxyMBean
AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints,
Collections.<InetAddress>emptyList(),
ConsistencyLevel.ONE,
- Table.SYSTEM_KS,
+ Table.open(Table.SYSTEM_KS),
null,
WriteType.BATCH_LOG);
updateBatchlog(rm, endpoints, handler);
@@ -321,7 +322,12 @@ public class StorageProxy implements StorageProxyMBean
{
RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
rm.delete(new QueryPath(SystemTable.BATCHLOG_CF), FBUtilities.timestampMicros());
- AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints, Collections.<InetAddress>emptyList(), ConsistencyLevel.ANY, Table.SYSTEM_KS, null, WriteType.SIMPLE);
+ AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints,
+ Collections.<InetAddress>emptyList(),
+ ConsistencyLevel.ANY,
+ Table.open(Table.SYSTEM_KS),
+ null,
+ WriteType.SIMPLE);
updateBatchlog(rm, endpoints, handler);
}
@@ -716,8 +722,9 @@ public class StorageProxy implements StorageProxyMBean
* is unclear we want to mix those latencies with read latencies, so this
* may be a bit involved.
*/
- private static InetAddress findSuitableEndpoint(String table, ByteBuffer key, String localDataCenter, ConsistencyLevel cl) throws UnavailableException
+ private static InetAddress findSuitableEndpoint(String tableName, ByteBuffer key, String localDataCenter, ConsistencyLevel cl) throws UnavailableException
{
+ Table table = Table.open(tableName);
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(table, key);
if (endpoints.isEmpty())
@@ -869,21 +876,22 @@ public class StorageProxy implements StorageProxyMBean
for (int i = 0; i < commands.size(); i++)
{
ReadCommand command = commands.get(i);
+ Table table = Table.open(command.getKeyspace());
assert !command.isDigestQuery();
logger.trace("Command/ConsistencyLevel is {}/{}", command, consistency_level);
- List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table,
- command.key);
- DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), endpoints);
+ List<InetAddress> endpoints = getLiveSortedEndpoints(table, command.key);
+ CFMetaData cfm = Schema.instance.getCFMetaData(command.getKeyspace(), command.getColumnFamilyName());
+ endpoints = consistency_level.filterForQuery(table, endpoints, cfm.newReadRepairDecision());
RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
- ReadCallback<ReadResponse, Row> handler = getReadCallback(resolver, command, consistency_level, endpoints);
+ ReadCallback<ReadResponse, Row> handler = new ReadCallback(resolver, consistency_level, command, endpoints);
handler.assureSufficientLiveNodes();
- assert !handler.endpoints.isEmpty();
+ assert !endpoints.isEmpty();
readCallbacks[i] = handler;
// The data-request message is sent to dataPoint, the node that will actually get the data for us
- InetAddress dataPoint = handler.endpoints.get(0);
+ InetAddress dataPoint = endpoints.get(0);
if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
{
logger.trace("reading data locally");
@@ -895,14 +903,14 @@ public class StorageProxy implements StorageProxyMBean
MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
}
- if (handler.endpoints.size() == 1)
+ if (endpoints.size() == 1)
continue;
// send the other endpoints a digest request
ReadCommand digestCommand = command.copy();
digestCommand.setDigestQuery(true);
MessageOut message = null;
- for (InetAddress digestPoint : handler.endpoints.subList(1, handler.endpoints.size()))
+ for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
{
if (digestPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
{
@@ -996,7 +1004,7 @@ public class StorageProxy implements StorageProxyMBean
}
catch (TimeoutException e)
{
- int blockFor = consistency_level.blockFor(command.getKeyspace());
+ int blockFor = consistency_level.blockFor(Table.open(command.getKeyspace()));
throw new ReadTimeoutException(consistency_level, blockFor, blockFor, true);
}
@@ -1071,13 +1079,28 @@ public class StorageProxy implements StorageProxyMBean
}
}
- static <TMessage, TResolved> ReadCallback<TMessage, TResolved> getReadCallback(IResponseResolver<TMessage, TResolved> resolver, IReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> endpoints)
+ private static List<InetAddress> getLiveSortedEndpoints(Table table, ByteBuffer key)
{
- if (consistencyLevel == ConsistencyLevel.LOCAL_QUORUM || consistencyLevel == ConsistencyLevel.EACH_QUORUM)
- {
- return new DatacenterReadCallback(resolver, consistencyLevel, command, endpoints);
- }
- return new ReadCallback(resolver, consistencyLevel, command, endpoints);
+ return getLiveSortedEndpoints(table, StorageService.instance.getPartitioner().decorateKey(key));
+ }
+
+ private static List<InetAddress> getLiveSortedEndpoints(Table table, RingPosition pos)
+ {
+ List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(table, pos);
+ DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints);
+ return liveEndpoints;
+ }
+
+ private static List<InetAddress> intersection(List<InetAddress> l1, List<InetAddress> l2)
+ {
+ // Note: we don't use Guava Sets.intersection() for 3 reasons:
+ // 1) retainAll would be inefficient if l1 and l2 are large but in practice both are the replicas for a range and
+ // so will be very small (< RF). In that case, retainAll is in fact more efficient.
+ // 2) we do ultimately need a list so converting everything to sets don't make sense
+ // 3) l1 and l2 are sorted by proximity. The use of retainAll maintain that sorting in the result, while using sets wouldn't.
+ List<InetAddress> inter = new ArrayList<InetAddress>(l1);
+ inter.retainAll(l2);
+ return inter;
}
public static List<Row> getRangeSlice(RangeSliceCommand command, ConsistencyLevel consistency_level)
@@ -1086,6 +1109,8 @@ public class StorageProxy implements StorageProxyMBean
Tracing.trace("Determining replicas to query");
logger.trace("Command/ConsistencyLevel is {}/{}", command.toString(), consistency_level);
long startTime = System.nanoTime();
+
+ Table table = Table.open(command.keyspace);
List<Row> rows;
// now scan until we have enough results
try
@@ -1095,8 +1120,51 @@ public class StorageProxy implements StorageProxyMBean
int cql3RowCount = 0;
rows = new ArrayList<Row>();
List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.range);
- for (AbstractBounds<RowPosition> range : ranges)
+ int i = 0;
+ AbstractBounds<RowPosition> nextRange = null;
+ List<InetAddress> nextEndpoints = null;
+ List<InetAddress> nextFilteredEndpoints = null;
+ while (i < ranges.size())
{
+ AbstractBounds<RowPosition> range = nextRange == null
+ ? ranges.get(i)
+ : nextRange;
+ List<InetAddress> liveEndpoints = nextEndpoints == null
+ ? getLiveSortedEndpoints(table, range.right)
+ : nextEndpoints;
+ List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null
+ ? consistency_level.filterForQuery(table, liveEndpoints)
+ : nextFilteredEndpoints;
+ ++i;
+
+ // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
+ // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
+ // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand.
+ while (i < ranges.size())
+ {
+ nextRange = ranges.get(i);
+ nextEndpoints = getLiveSortedEndpoints(table, nextRange.right);
+ nextFilteredEndpoints = consistency_level.filterForQuery(table, liveEndpoints);
+
+ List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints);
+
+ // Check if there is enough endpoint for the merge to be possible.
+ if (!consistency_level.isSufficientLiveNodes(table, merged))
+ break;
+
+ List<InetAddress> filteredMerged = consistency_level.filterForQuery(table, merged);
+
+ // Estimate whether merging will be a win or not
+ if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints))
+ break;
+
+ // If we get there, merge this range and the next one
+ range = range.withNewRight(nextRange.right);
+ liveEndpoints = merged;
+ filteredEndpoints = filteredMerged;
+ ++i;
+ }
+
RangeSliceCommand nodeCmd = new RangeSliceCommand(command.keyspace,
command.column_family,
command.super_column,
@@ -1107,16 +1175,13 @@ public class StorageProxy implements StorageProxyMBean
command.countCQL3Rows,
command.isPaging);
- List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(nodeCmd.keyspace, range.right);
- DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints);
-
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace);
- ReadCallback<RangeSliceReply, Iterable<Row>> handler = getReadCallback(resolver, nodeCmd, consistency_level, liveEndpoints);
+ ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback(resolver, consistency_level, nodeCmd, filteredEndpoints);
handler.assureSufficientLiveNodes();
- resolver.setSources(handler.endpoints);
- if (handler.endpoints.size() == 1
- && handler.endpoints.get(0).equals(FBUtilities.getBroadcastAddress())
+ resolver.setSources(filteredEndpoints);
+ if (filteredEndpoints.size() == 1
+ && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
&& OPTIMIZE_LOCAL_REQUESTS)
{
logger.trace("reading data locally");
@@ -1125,7 +1190,7 @@ public class StorageProxy implements StorageProxyMBean
else
{
MessageOut<RangeSliceCommand> message = nodeCmd.createMessage();
- for (InetAddress endpoint : handler.endpoints)
+ for (InetAddress endpoint : filteredEndpoints)
{
MessagingService.instance().sendRR(message, endpoint, handler);
logger.trace("reading {} from {}", nodeCmd, endpoint);
@@ -1147,7 +1212,7 @@ public class StorageProxy implements StorageProxyMBean
{
logger.debug("Range slice timeout: {}", ex.toString());
// We actually got all response at that point
- int blockFor = consistency_level.blockFor(command.keyspace);
+ int blockFor = consistency_level.blockFor(table);
throw new ReadTimeoutException(consistency_level, blockFor, blockFor, true);
}
catch (DigestMismatchException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index eb62032..e57cfe4 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2491,14 +2491,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* @param key key for which we need to find the endpoint
* @return the endpoint responsible for this key
*/
- public List<InetAddress> getLiveNaturalEndpoints(String table, ByteBuffer key)
+ public List<InetAddress> getLiveNaturalEndpoints(Table table, ByteBuffer key)
{
return getLiveNaturalEndpoints(table, getPartitioner().decorateKey(key));
}
- public List<InetAddress> getLiveNaturalEndpoints(String table, RingPosition pos)
+ public List<InetAddress> getLiveNaturalEndpoints(Table table, RingPosition pos)
{
- List<InetAddress> endpoints = Table.open(table).getReplicationStrategy().getNaturalEndpoints(pos);
+ List<InetAddress> endpoints = table.getReplicationStrategy().getNaturalEndpoints(pos);
List<InetAddress> liveEps = new ArrayList<InetAddress>(endpoints.size());
for (InetAddress endpoint : endpoints)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index ab7223a..1c229d5 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.net.MessageIn;
@@ -41,32 +42,26 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
protected static final Logger logger = LoggerFactory.getLogger(WriteResponseHandler.class);
protected final AtomicInteger responses;
- private final int blockFor;
public WriteResponseHandler(Collection<InetAddress> writeEndpoints,
Collection<InetAddress> pendingEndpoints,
ConsistencyLevel consistencyLevel,
- String table,
+ Table table,
Runnable callback,
WriteType writeType)
{
- super(writeEndpoints, pendingEndpoints, consistencyLevel, callback, writeType);
- blockFor = consistencyLevel.blockFor(table);
- responses = new AtomicInteger(blockFor);
+ super(table, writeEndpoints, pendingEndpoints, consistencyLevel, callback, writeType);
+ responses = new AtomicInteger(consistencyLevel.blockFor(table));
}
public WriteResponseHandler(InetAddress endpoint, WriteType writeType, Runnable callback)
{
- super(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ALL, callback, writeType);
- blockFor = 1;
- responses = new AtomicInteger(1);
+ this(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ONE, null, callback, writeType);
}
public WriteResponseHandler(InetAddress endpoint, WriteType writeType)
{
- super(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ALL, null, writeType);
- blockFor = 1;
- responses = new AtomicInteger(1);
+ this(endpoint, writeType, null);
}
public void response(MessageIn m)
@@ -77,33 +72,7 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
protected int ackCount()
{
- return blockFor - responses.get();
- }
-
- protected int blockForCL()
- {
- return blockFor;
- }
-
- public void assureSufficientLiveNodes() throws UnavailableException
- {
- if (consistencyLevel == ConsistencyLevel.ANY)
- {
- // local hint is acceptable, and local node is always live
- return;
- }
-
- // count destinations that are part of the desired target set
- int liveNodes = 0;
- for (InetAddress destination : Iterables.concat(naturalEndpoints, pendingEndpoints))
- {
- if (FailureDetector.instance.isAlive(destination))
- liveNodes++;
- }
- if (liveNodes < blockFor)
- {
- throw new UnavailableException(consistencyLevel, blockFor, liveNodes);
- }
+ return consistencyLevel.blockFor(table) - responses.get();
}
public boolean isLatencyForSnitch()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
index bbb382b..8212468 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
@@ -164,7 +164,7 @@ public class ReplicationStrategyEndpointCacheTest extends SchemaLoader
private AbstractReplicationStrategy getStrategyWithNewTokenMetadata(AbstractReplicationStrategy strategy, TokenMetadata newTmd) throws ConfigurationException
{
return AbstractReplicationStrategy.createReplicationStrategy(
- strategy.table,
+ strategy.tableName,
strategy.getClass().getName(),
newTmd,
strategy.snitch,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52671128/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
index 5f108ee..a457df8 100644
--- a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
@@ -165,16 +165,6 @@ public class SimpleStrategyTest extends SchemaLoader
StorageServiceAccessor.setTokenMetadata(oldTmd);
}
- private AbstractReplicationStrategy getStrategyWithNewTokenMetadata(AbstractReplicationStrategy strategy, TokenMetadata newTmd) throws ConfigurationException
- {
- return AbstractReplicationStrategy.createReplicationStrategy(
- strategy.table,
- strategy.getClass().getName(),
- newTmd,
- strategy.snitch,
- strategy.configOptions);
- }
-
private AbstractReplicationStrategy getStrategy(String table, TokenMetadata tmd) throws ConfigurationException
{
KSMetaData ksmd = Schema.instance.getKSMetaData(table);