You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2010/02/04 16:21:33 UTC
svn commit: r906521 [2/3] - in /incubator/cassandra/trunk: ./ conf/
contrib/circuit/src/org/apache/cassandra/contrib/circuit/
src/java/org/apache/cassandra/client/ src/java/org/apache/cassandra/config/
src/java/org/apache/cassandra/db/ src/java/org/apa...
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Thu Feb 4 15:21:31 2010
@@ -23,7 +23,11 @@
import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.net.InetAddress;
@@ -109,9 +113,9 @@
return partitioner_;
}
- public Collection<Range> getLocalRanges()
+ public Collection<Range> getLocalRanges(String table)
{
- return getRangesForEndPoint(FBUtilities.getLocalAddress());
+ return getRangesForEndPoint(table, FBUtilities.getLocalAddress());
}
public Range getLocalPrimaryRange()
@@ -119,14 +123,6 @@
return getPrimaryRangeForEndPoint(FBUtilities.getLocalAddress());
}
- /*
- * This is the endpoint snitch which depends on the network architecture. We
- * need to keep this information for each endpoint so that we make decisions
- * while doing things like replication etc.
- *
- */
- private IEndPointSnitch endPointSnitch_;
-
/* This abstraction maintains the token/endpoint metadata information */
private TokenMetadata tokenMetadata_ = new TokenMetadata();
private SystemTable.StorageMetadata storageMetadata_;
@@ -140,20 +136,21 @@
new NamedThreadFactory("CONSISTENCY-MANAGER"));
/* We use this interface to determine where replicas need to be placed */
- private AbstractReplicationStrategy replicationStrategy_;
+ private Map<String, AbstractReplicationStrategy> replicationStrategies = new HashMap<String, AbstractReplicationStrategy>();
+
/* Are we starting this node in bootstrap mode? */
private boolean isBootstrapMode;
private Multimap<InetAddress, String> bootstrapSet;
/* when intialized as a client, we shouldn't write to the system table. */
private boolean isClientMode;
-
+
public synchronized void addBootstrapSource(InetAddress s, String table)
{
if (logger_.isDebugEnabled())
- logger_.debug("Added " + s + " as a bootstrap source");
+ logger_.debug(String.format("Added %s/%s as a bootstrap source", s, table));
bootstrapSet.put(s, table);
}
-
+
public synchronized void removeBootstrapSource(InetAddress s, String table)
{
if (table == null)
@@ -161,7 +158,7 @@
else
bootstrapSet.remove(s, table);
if (logger_.isDebugEnabled())
- logger_.debug("Removed " + s + " as a bootstrap source; remaining is [" + StringUtils.join(bootstrapSet.keySet(), ", ") + "]");
+ logger_.debug(String.format("Removed %s/%s as a bootstrap source; remaining is [%s]", s, table == null ? "<ALL>" : table, StringUtils.join(bootstrapSet.keySet(), ", ")));
if (bootstrapSet.isEmpty())
{
@@ -200,7 +197,6 @@
}
bootstrapSet = HashMultimap.create();
- endPointSnitch_ = DatabaseDescriptor.getEndPointSnitch();
/* register the verb handlers */
MessagingService.instance.registerVerbHandlers(Verb.BINARY, new BinaryVerbHandler());
@@ -222,19 +218,30 @@
MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN, new Gossiper.GossipDigestSynVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new Gossiper.GossipDigestAckVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK2, new Gossiper.GossipDigestAck2VerbHandler());
+ }
- replicationStrategy_ = getReplicationStrategy(tokenMetadata_);
+ public synchronized AbstractReplicationStrategy getReplicationStrategy(String table)
+ {
+ AbstractReplicationStrategy strat = replicationStrategies.get(table);
+ if (strat == null)
+ {
+ strat = StorageService.getReplicationStrategy(tokenMetadata_, table);
+ replicationStrategies.put(table, strat);
+ }
+ return strat;
}
- public static AbstractReplicationStrategy getReplicationStrategy(TokenMetadata tokenMetadata)
+ public static AbstractReplicationStrategy getReplicationStrategy(TokenMetadata tokenMetadata, String table)
{
AbstractReplicationStrategy replicationStrategy = null;
- Class<AbstractReplicationStrategy> cls = DatabaseDescriptor.getReplicaPlacementStrategyClass();
- Class [] parameterTypes = new Class[] { TokenMetadata.class, int.class};
+ Class<AbstractReplicationStrategy> cls = DatabaseDescriptor.getReplicaPlacementStrategyClass(table);
+ if (cls == null)
+ throw new RuntimeException(String.format("No replica strategy configured for %s", table));
+ Class [] parameterTypes = new Class[] { TokenMetadata.class, IEndPointSnitch.class};
try
{
Constructor<AbstractReplicationStrategy> constructor = cls.getConstructor(parameterTypes);
- replicationStrategy = constructor.newInstance(tokenMetadata, DatabaseDescriptor.getReplicationFactor());
+ replicationStrategy = constructor.newInstance(tokenMetadata, DatabaseDescriptor.getEndPointSnitch(table));
}
catch (Exception e)
{
@@ -258,7 +265,7 @@
Gossiper.instance.register(this);
Gossiper.instance.start(FBUtilities.getLocalAddress(), (int)(System.currentTimeMillis() / 1000)); // needed for node-ring gathering.
}
-
+
public void initServer() throws IOException
{
isClientMode = false;
@@ -327,7 +334,7 @@
{
throw new AssertionError(e);
}
- new BootStrapper(replicationStrategy_, FBUtilities.getLocalAddress(), token, tokenMetadata_).startBootstrap(); // handles token update
+ new BootStrapper(FBUtilities.getLocalAddress(), token, tokenMetadata_).startBootstrap(); // handles token update
}
public boolean isBootstrapMode()
@@ -339,12 +346,7 @@
{
return tokenMetadata_;
}
-
- public IEndPointSnitch getEndPointSnitch()
- {
- return endPointSnitch_;
- }
-
+
/**
* This method performs the requisite operations to make
* sure that the N replicas are in sync. We do this in the
@@ -355,12 +357,28 @@
consistencyManager_.submit(new ConsistencyManager(command.table, row, endpoints, command));
}
- public Map<Range, List<String>> getRangeToEndPointMap()
+ /**
+ * for a keyspace, return the ranges and corresponding hosts for a given keyspace.
+ * @param keyspace
+ * @return
+ */
+ public Map<Range, List<String>> getRangeToEndPointMap(String keyspace)
{
+ // some people just want to get a visual representation of things. Allow null and set it to the first
+ // non-system table.
+ if (keyspace == null)
+ {
+ for (String ks : DatabaseDescriptor.getNonSystemTables())
+ {
+ keyspace = ks;
+ break;
+ }
+ }
+
/* All the ranges for the tokens */
List<Range> ranges = getAllRanges(tokenMetadata_.sortedTokens());
Map<Range, List<String>> map = new HashMap<Range, List<String>>();
- for (Map.Entry<Range,List<InetAddress>> entry : constructRangeToEndPointMap(ranges).entrySet())
+ for (Map.Entry<Range,List<InetAddress>> entry : constructRangeToEndPointMap(keyspace, ranges, keyspace).entrySet())
{
map.put(entry.getKey(), stringify(entry.getValue()));
}
@@ -368,17 +386,17 @@
}
/**
- * Construct the range to endpoint mapping based on the true view
- * of the world.
+ * Construct the range to endpoint mapping based on the true view
+ * of the world.
* @param ranges
* @return mapping of ranges to the replicas responsible for them.
*/
- public Map<Range, List<InetAddress>> constructRangeToEndPointMap(List<Range> ranges)
+ private Map<Range, List<InetAddress>> constructRangeToEndPointMap(String keyspace, List<Range> ranges, String table)
{
Map<Range, List<InetAddress>> rangeToEndPointMap = new HashMap<Range, List<InetAddress>>();
for (Range range : ranges)
{
- rangeToEndPointMap.put(range, replicationStrategy_.getNaturalEndpoints(range.right));
+ rangeToEndPointMap.put(range, getReplicationStrategy(keyspace).getNaturalEndpoints(range.right, table));
}
return rangeToEndPointMap;
}
@@ -593,12 +611,14 @@
*/
private void calculatePendingRanges()
{
- calculatePendingRanges(tokenMetadata_, replicationStrategy_);
+ for (String table : DatabaseDescriptor.getNonSystemTables())
+ calculatePendingRanges(getReplicationStrategy(table), table);
}
// public & static for testing purposes
- public static void calculatePendingRanges(TokenMetadata tm, AbstractReplicationStrategy strategy)
+ public static void calculatePendingRanges(AbstractReplicationStrategy strategy, String table)
{
+ TokenMetadata tm = StorageService.instance.getTokenMetadata();
Multimap<Range, InetAddress> pendingRanges = HashMultimap.create();
Map<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens();
Set<InetAddress> leavingEndPoints = tm.getLeavingEndPoints();
@@ -606,12 +626,12 @@
if (bootstrapTokens.isEmpty() && leavingEndPoints.isEmpty())
{
if (logger_.isDebugEnabled())
- logger_.debug("No bootstrapping or leaving nodes -> empty pending ranges");
- tm.setPendingRanges(pendingRanges);
+ logger_.debug("No bootstrapping or leaving nodes -> empty pending ranges for " + table);
+ tm.setPendingRanges(table, pendingRanges);
return;
}
- Multimap<InetAddress, Range> addressRanges = strategy.getAddressRanges();
+ Multimap<InetAddress, Range> addressRanges = strategy.getAddressRanges(table);
// Copy of metadata reflecting the situation after all leave operations are finished.
TokenMetadata allLeftMetadata = tm.cloneAfterAllLeft();
@@ -625,8 +645,8 @@
// all leaving nodes are gone.
for (Range range : affectedRanges)
{
- List<InetAddress> currentEndPoints = strategy.getNaturalEndpoints(range.right, tm);
- List<InetAddress> newEndPoints = strategy.getNaturalEndpoints(range.right, allLeftMetadata);
+ List<InetAddress> currentEndPoints = strategy.getNaturalEndpoints(range.right, tm, table);
+ List<InetAddress> newEndPoints = strategy.getNaturalEndpoints(range.right, allLeftMetadata, table);
newEndPoints.removeAll(currentEndPoints);
pendingRanges.putAll(range, newEndPoints);
}
@@ -641,19 +661,19 @@
InetAddress endPoint = entry.getValue();
allLeftMetadata.updateNormalToken(entry.getKey(), endPoint);
- for (Range range : strategy.getAddressRanges(allLeftMetadata).get(endPoint))
+ for (Range range : strategy.getAddressRanges(allLeftMetadata, table).get(endPoint))
pendingRanges.put(range, endPoint);
allLeftMetadata.removeEndpoint(endPoint);
}
- tm.setPendingRanges(pendingRanges);
+ tm.setPendingRanges(table, pendingRanges);
if (logger_.isDebugEnabled())
logger_.debug("Pending ranges:\n" + (pendingRanges.isEmpty() ? "<empty>" : tm.printPendingRanges()));
}
/**
- * Called when endPoint is removed from the ring without proper
+ * Called when an endPoint is removed from the ring without proper
* STATE_LEAVING -> STATE_LEFT sequence. This function checks
* whether this node becomes responsible for new ranges as a
* consequence and streams data if needed.
@@ -667,57 +687,66 @@
{
InetAddress myAddress = FBUtilities.getLocalAddress();
- // get all ranges that change ownership (that is, a node needs
- // to take responsibility for new range)
- Multimap<Range, InetAddress> changedRanges = getChangedRangesForLeaving(endPoint);
-
- // check if any of these ranges are coming our way
- Set<Range> myNewRanges = new HashSet<Range>();
- for (Map.Entry<Range, InetAddress> entry : changedRanges.entries())
+ for (String table : DatabaseDescriptor.getNonSystemTables())
{
- if (entry.getValue().equals(myAddress))
- myNewRanges.add(entry.getKey());
- }
-
- if (!myNewRanges.isEmpty())
- {
- if (logger_.isDebugEnabled())
- logger_.debug(endPoint + " was removed, my added ranges: " + StringUtils.join(myNewRanges, ", "));
-
- Multimap<Range, InetAddress> rangeAddresses = replicationStrategy_.getRangeAddresses(tokenMetadata_);
- Multimap<InetAddress, Range> sourceRanges = HashMultimap.create();
- IFailureDetector failureDetector = FailureDetector.instance;
+ // get all ranges that change ownership (that is, a node needs
+ // to take responsibility for new range)
+ Multimap<Range, InetAddress> changedRanges = getChangedRangesForLeaving(table, endPoint);
+
+ // check if any of these ranges are coming our way
+ Set<Range> myNewRanges = new HashSet<Range>();
+ for (Map.Entry<Range, InetAddress> entry : changedRanges.entries())
+ {
+ if (entry.getValue().equals(myAddress))
+ myNewRanges.add(entry.getKey());
+ }
- // find alive sources for our new ranges
- for (Range myNewRange : myNewRanges)
+ if (!myNewRanges.isEmpty())
{
- List<InetAddress> sources = DatabaseDescriptor.getEndPointSnitch().getSortedListByProximity(myAddress, rangeAddresses.get(myNewRange));
+ if (logger_.isDebugEnabled())
+ logger_.debug(endPoint + " was removed, my added ranges: " + StringUtils.join(myNewRanges, ", "));
- assert (!sources.contains(myAddress));
+ Multimap<Range, InetAddress> rangeAddresses = getReplicationStrategy(table).getRangeAddresses(tokenMetadata_, table);
+ Multimap<InetAddress, Range> sourceRanges = HashMultimap.create();
+ IFailureDetector failureDetector = FailureDetector.instance;
- for (InetAddress source : sources)
+ // find alive sources for our new ranges
+ for (Range myNewRange : myNewRanges)
{
- if (source.equals(endPoint))
- continue;
+ List<InetAddress> sources = DatabaseDescriptor.getEndPointSnitch(table).getSortedListByProximity(myAddress, rangeAddresses.get(myNewRange));
- if (failureDetector.isAlive(source))
+ assert (!sources.contains(myAddress));
+
+ for (InetAddress source : sources)
{
- sourceRanges.put(source, myNewRange);
- break;
+ if (source.equals(endPoint))
+ continue;
+
+ if (failureDetector.isAlive(source))
+ {
+ sourceRanges.put(source, myNewRange);
+ break;
+ }
}
}
- }
- // Finally we have a list of addresses and ranges to stream. Proceed to stream
- for (Map.Entry<InetAddress, Collection<Range>> entry : sourceRanges.asMap().entrySet())
- StreamIn.requestRanges(entry.getKey(), entry.getValue());
+ // Finally we have a list of addresses and ranges to
+ // stream. Proceed to stream
+ for (Map.Entry<InetAddress, Collection<Range>> entry : sourceRanges.asMap().entrySet())
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Requesting from " + entry.getKey() + " ranges " + StringUtils.join(entry.getValue(), ", "));
+ StreamIn.requestRanges(entry.getKey(), table, entry.getValue());
+ }
+ }
}
}
- private Multimap<Range, InetAddress> getChangedRangesForLeaving(InetAddress endpoint)
+ // needs to be modified to accept either a table or ARS.
+ private Multimap<Range, InetAddress> getChangedRangesForLeaving(String table, InetAddress endpoint)
{
// First get all ranges the leaving endpoint is responsible for
- Collection<Range> ranges = getRangesForEndPoint(endpoint);
+ Collection<Range> ranges = getRangesForEndPoint(table, endpoint);
if (logger_.isDebugEnabled())
logger_.debug("Node " + endpoint + " ranges [" + StringUtils.join(ranges, ", ") + "]");
@@ -726,7 +755,7 @@
// Find (for each range) all nodes that store replicas for these ranges as well
for (Range range : ranges)
- currentReplicaEndpoints.put(range, replicationStrategy_.getNaturalEndpoints(range.right, tokenMetadata_));
+ currentReplicaEndpoints.put(range, getReplicationStrategy(table).getNaturalEndpoints(range.right, tokenMetadata_, table));
TokenMetadata temp = tokenMetadata_.cloneAfterAllLeft();
@@ -744,7 +773,7 @@
// range.
for (Range range : ranges)
{
- ArrayList<InetAddress> newReplicaEndpoints = replicationStrategy_.getNaturalEndpoints(range.right, temp);
+ ArrayList<InetAddress> newReplicaEndpoints = getReplicationStrategy(table).getNaturalEndpoints(range.right, temp, table);
newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range));
if (logger_.isDebugEnabled())
if (newReplicaEndpoints.isEmpty())
@@ -872,14 +901,14 @@
public void forceTableCleanup() throws IOException
{
- for (Table table : Table.all())
+ List<String> tables = DatabaseDescriptor.getNonSystemTables();
+ for (String tName : tables)
{
- if (table.name.equals(Table.SYSTEM_TABLE))
- continue;
+ Table table = Table.open(tName);
table.forceCleanup();
}
}
-
+
public void forceTableCompaction() throws IOException
{
for (Table table : Table.all())
@@ -888,7 +917,7 @@
/**
* Takes the snapshot for a given table.
- *
+ *
* @param tableName the name of the table.
* @param tag the tag given to the snapshot (null is permissible)
*/
@@ -909,7 +938,7 @@
/**
* Takes a snapshot for every table.
- *
+ *
* @param tag the tag given to the snapshot (null is permissible)
*/
public void takeAllSnapshot(String tag) throws IOException
@@ -925,6 +954,7 @@
{
for (Table table : Table.all())
table.clearSnapshot();
+
if (logger_.isDebugEnabled())
logger_.debug("Cleared out all snapshot directories");
}
@@ -975,7 +1005,7 @@
{
// request that all relevant endpoints generate trees
final MessagingService ms = MessagingService.instance;
- final List<InetAddress> endpoints = getNaturalEndpoints(getLocalToken());
+ final List<InetAddress> endpoints = getNaturalEndpoints(tableName, getLocalToken());
for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
{
Message request = TreeRequestVerbHandler.makeVerb(tableName, cfStore.getColumnFamilyName());
@@ -985,7 +1015,7 @@
}
/* End of MBean interface methods */
-
+
/**
* This method returns the predecessor of the endpoint ep on the identifier
* space.
@@ -1015,20 +1045,20 @@
{
return tokenMetadata_.getPrimaryRangeFor(tokenMetadata_.getToken(ep));
}
-
+
/**
* Get all ranges an endpoint is responsible for.
* @param ep endpoint we are interested in.
* @return ranges for the specified endpoint.
*/
- Collection<Range> getRangesForEndPoint(InetAddress ep)
+ Collection<Range> getRangesForEndPoint(String table, InetAddress ep)
{
- return replicationStrategy_.getAddressRanges().get(ep);
+ return getReplicationStrategy(table).getAddressRanges(table).get(ep);
}
-
+
/**
* Get all ranges that span the ring given a set
- * of tokens. All ranges are in sorted order of
+ * of tokens. All ranges are in sorted order of
* ranges.
* @return ranges in sorted order
*/
@@ -1108,10 +1138,10 @@
* @param key - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
- public List<InetAddress> getNaturalEndpoints(String key)
+ public List<InetAddress> getNaturalEndpoints(String table, String key)
{
- return getNaturalEndpoints(partitioner_.getToken(key));
- }
+ return getNaturalEndpoints(table, partitioner_.getToken(key));
+ }
/**
* This method returns the N endpoints that are responsible for storing the
@@ -1120,11 +1150,11 @@
* @param token - token for which we need to find the endpoint return value -
* the endpoint responsible for this token
*/
- public List<InetAddress> getNaturalEndpoints(Token token)
+ public List<InetAddress> getNaturalEndpoints(String table, Token token)
{
- return replicationStrategy_.getNaturalEndpoints(token);
- }
-
+ return getReplicationStrategy(table).getNaturalEndpoints(token, table);
+ }
+
/**
* This method attempts to return N endpoints that are responsible for storing the
* specified key i.e for replication.
@@ -1132,15 +1162,15 @@
* @param key - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
- public List<InetAddress> getLiveNaturalEndpoints(String key)
+ public List<InetAddress> getLiveNaturalEndpoints(String table, String key)
{
- return getLiveNaturalEndpoints(partitioner_.getToken(key));
+ return getLiveNaturalEndpoints(table, partitioner_.getToken(key));
}
- public List<InetAddress> getLiveNaturalEndpoints(Token token)
+ public List<InetAddress> getLiveNaturalEndpoints(String table, Token token)
{
List<InetAddress> liveEps = new ArrayList<InetAddress>();
- List<InetAddress> endpoints = replicationStrategy_.getNaturalEndpoints(token);
+ List<InetAddress> endpoints = getReplicationStrategy(table).getNaturalEndpoints(token, table);
for (InetAddress endpoint : endpoints)
{
@@ -1158,18 +1188,18 @@
* @param key - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
- public Map<InetAddress, InetAddress> getHintedEndpointMap(String key, List<InetAddress> naturalEndpoints)
+ public Map<InetAddress, InetAddress> getHintedEndpointMap(String table, String key, List<InetAddress> naturalEndpoints)
{
- return replicationStrategy_.getHintedEndpoints(partitioner_.getToken(key), naturalEndpoints);
+ return getReplicationStrategy(table).getHintedEndpoints(partitioner_.getToken(key), table, naturalEndpoints);
}
/**
* This function finds the closest live endpoint that contains a given key.
*/
- public InetAddress findSuitableEndPoint(String key) throws IOException, UnavailableException
+ public InetAddress findSuitableEndPoint(String table, String key) throws IOException, UnavailableException
{
- List<InetAddress> endpoints = getNaturalEndpoints(key);
- endPointSnitch_.sortByProximity(FBUtilities.getLocalAddress(), endpoints);
+ List<InetAddress> endpoints = getNaturalEndpoints(table, key);
+ DatabaseDescriptor.getEndPointSnitch(table).sortByProximity(FBUtilities.getLocalAddress(), endpoints);
for (InetAddress endpoint : endpoints)
{
if (FailureDetector.instance.isAlive(endpoint))
@@ -1201,7 +1231,7 @@
* There will be 1 more token than splits requested. So for splits of 2, tokens T1 T2 T3 will be returned,
* where (T1, T2] is the first range and (T2, T3] is the second. The first token will always be the left
* Token of this node's primary range, and the last will always be the Right token of that range.
- */
+ */
public List<String> getSplits(int splits)
{
assert splits > 1;
@@ -1260,8 +1290,11 @@
throw new UnsupportedOperationException("local node is not a member of the token ring yet");
if (tokenMetadata_.cloneAfterAllLeft().sortedTokens().size() < 2)
throw new UnsupportedOperationException("no other normal nodes in the ring; decommission would be pointless");
- if (tokenMetadata_.getPendingRanges(FBUtilities.getLocalAddress()).size() > 0)
- throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
+ for (String table : DatabaseDescriptor.getNonSystemTables())
+ {
+ if (tokenMetadata_.getPendingRanges(table, FBUtilities.getLocalAddress()).size() > 0)
+ throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
+ }
logger_.info("DECOMMISSIONING");
startLeaving();
@@ -1302,44 +1335,56 @@
private void unbootstrap(final Runnable onFinish)
{
- Multimap<Range, InetAddress> rangesMM = getChangedRangesForLeaving(FBUtilities.getLocalAddress());
- if (logger_.isDebugEnabled())
- logger_.debug("Ranges needing transfer are [" + StringUtils.join(rangesMM.keySet(), ",") + "]");
-
- if (rangesMM.isEmpty())
+ final CountDownLatch latch = new CountDownLatch(DatabaseDescriptor.getNonSystemTables().size());
+ for (final String table : DatabaseDescriptor.getNonSystemTables())
{
- // nothing needs transfer, so leave immediately. this can happen when replication factor == number of nodes.
- leaveRing();
- onFinish.run();
- return;
- }
+ Multimap<Range, InetAddress> rangesMM = getChangedRangesForLeaving(table, FBUtilities.getLocalAddress());
+ if (logger_.isDebugEnabled())
+ logger_.debug("Ranges needing transfer are [" + StringUtils.join(rangesMM.keySet(), ",") + "]");
+ if (rangesMM.isEmpty())
+ {
+ latch.countDown();
+ continue;
+ }
- final Set<Map.Entry<Range, InetAddress>> pending = new HashSet<Map.Entry<Range, InetAddress>>(rangesMM.entries());
- for (final Map.Entry<Range, InetAddress> entry : rangesMM.entries())
- {
- final Range range = entry.getKey();
- final InetAddress newEndpoint = entry.getValue();
- final Runnable callback = new Runnable()
+ final Set<Map.Entry<Range, InetAddress>> pending = Collections.synchronizedSet(new HashSet<Map.Entry<Range, InetAddress>>(rangesMM.entries()));
+ for (final Map.Entry<Range, InetAddress> entry : rangesMM.entries())
{
- public synchronized void run()
+ final Range range = entry.getKey();
+ final InetAddress newEndpoint = entry.getValue();
+ final Runnable callback = new Runnable()
{
- pending.remove(entry);
- if (pending.isEmpty())
+ public void run()
{
- leaveRing();
- onFinish.run();
+ pending.remove(entry);
+ if (pending.isEmpty())
+ latch.countDown();
}
- }
- };
- StageManager.getStage(StageManager.STREAM_STAGE).execute(new Runnable()
- {
- public void run()
+ };
+ StageManager.getStage(StageManager.STREAM_STAGE).execute(new Runnable()
{
- // TODO each call to transferRanges re-flushes, this is potentially a lot of waste
- StreamOut.transferRanges(newEndpoint, Arrays.asList(range), callback);
- }
- });
+ public void run()
+ {
+ // TODO each call to transferRanges re-flushes, this is potentially a lot of waste
+ StreamOut.transferRanges(newEndpoint, table, Arrays.asList(range), callback);
+ }
+ });
+ }
+ }
+
+ // wait for the transfer runnables to signal the latch.
+ logger_.debug("waiting for stream aks.");
+ try
+ {
+ latch.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
}
+ logger_.debug("stream acks all received.");
+ leaveRing();
+ onFinish.run();
}
public void move(String newToken) throws InterruptedException
@@ -1359,8 +1404,11 @@
*/
private void move(final Token token) throws InterruptedException
{
- if (tokenMetadata_.getPendingRanges(FBUtilities.getLocalAddress()).size() > 0)
- throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
+ for (String table : DatabaseDescriptor.getTables())
+ {
+ if (tokenMetadata_.getPendingRanges(table, FBUtilities.getLocalAddress()).size() > 0)
+ throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
+ }
logger_.info("starting move. leaving token " + getLocalToken());
startLeaving();
@@ -1409,19 +1457,14 @@
// to add new AP state for this command, but that would again
// increase the amount of data to be gossiped in the cluster -
// not good. REMOVE_TOKEN|LEFT_NORMALLY is used to distinguish
- // between removetoken command and normal state left, so it is
+ // between ``removetoken command and normal state left, so it is
// not so bad.
Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_LEFT + Delimiter + REMOVE_TOKEN + Delimiter + token.toString()));
}
- public WriteResponseHandler getWriteResponseHandler(int blockFor, ConsistencyLevel consistency_level)
+ public WriteResponseHandler getWriteResponseHandler(int blockFor, ConsistencyLevel consistency_level, String table)
{
- return replicationStrategy_.getWriteResponseHandler(blockFor, consistency_level);
- }
-
- public AbstractReplicationStrategy getReplicationStrategy()
- {
- return replicationStrategy_;
+ return getReplicationStrategy(table).getWriteResponseHandler(blockFor, consistency_level, table);
}
public boolean isClientMode()
@@ -1430,11 +1473,11 @@
}
// Never ever do this at home. Used by tests.
- AbstractReplicationStrategy setReplicationStrategyUnsafe(AbstractReplicationStrategy newStrategy)
+ Map<String, AbstractReplicationStrategy> setReplicationStrategyUnsafe(Map<String, AbstractReplicationStrategy> replacement)
{
- AbstractReplicationStrategy oldStrategy = replicationStrategy_;
- replicationStrategy_ = newStrategy;
- return oldStrategy;
+ Map<String, AbstractReplicationStrategy> old = replicationStrategies;
+ replicationStrategies = replacement;
+ return old;
}
// Never ever do this at home. Used by tests.
@@ -1445,4 +1488,11 @@
return oldPartitioner;
}
+ TokenMetadata setTokenMetadataUnsafe(TokenMetadata tmd)
+ {
+ TokenMetadata old = tokenMetadata_;
+ tokenMetadata_ = tmd;
+ return old;
+ }
+
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Thu Feb 4 15:21:31 2010
@@ -60,7 +60,7 @@
*
* @return mapping of ranges to end points
*/
- public Map<Range, List<String>> getRangeToEndPointMap();
+ public Map<Range, List<String>> getRangeToEndPointMap(String keyspace);
/** Human-readable load value */
public String getLoadString();
@@ -82,7 +82,7 @@
* @param key - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
- public List<InetAddress> getNaturalEndpoints(String key);
+ public List<InetAddress> getNaturalEndpoints(String key, String table);
/**
* Forces major compaction (all sstable files compacted)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java Thu Feb 4 15:21:31 2010
@@ -41,11 +41,11 @@
protected int localResponses;
private final long startTime;
- public WriteResponseHandler(int responseCount)
+ public WriteResponseHandler(int responseCount, String table)
{
// at most one node per range can bootstrap at a time, and these will be added to the write until
// bootstrap finishes (at which point we no longer need to write to the old ones).
- assert 1 <= responseCount && responseCount <= 2 * DatabaseDescriptor.getReplicationFactor()
+ assert 1 <= responseCount && responseCount <= 2 * DatabaseDescriptor.getReplicationFactor(table)
: "invalid response count " + responseCount;
this.responseCount = responseCount;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Thu Feb 4 15:21:31 2010
@@ -1,30 +1,30 @@
-package org.apache.cassandra.streaming;
-
-import java.net.InetAddress;
-import java.util.Collection;
-
-import org.apache.log4j.Logger;
-import org.apache.commons.lang.StringUtils;
-
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.FBUtilities;
-
-/** for streaming data from other nodes in to this one */
-public class StreamIn
-{
- private static Logger logger = Logger.getLogger(StreamOut.class);
-
- /**
- * Request ranges to be transferred from source to local node
- */
- public static void requestRanges(InetAddress source, Collection<Range> ranges)
- {
- if (logger.isDebugEnabled())
- logger.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges, ", "));
- StreamRequestMetadata streamRequestMetadata = new StreamRequestMetadata(FBUtilities.getLocalAddress(), ranges);
- Message message = StreamRequestMessage.makeStreamRequestMessage(new StreamRequestMessage(streamRequestMetadata));
- MessagingService.instance.sendOneWay(message, source);
- }
-}
+package org.apache.cassandra.streaming;
+
+import java.net.InetAddress;
+import java.util.Collection;
+
+import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
+
+/** for streaming data from other nodes in to this one */
+public class StreamIn
+{
+ private static Logger logger = Logger.getLogger(StreamOut.class);
+
+ /**
+ * Request ranges to be transferred from source to local node
+ */
+ public static void requestRanges(InetAddress source, String tableName, Collection<Range> ranges)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges, ", "));
+ StreamRequestMetadata streamRequestMetadata = new StreamRequestMetadata(FBUtilities.getLocalAddress(), ranges, tableName);
+ Message message = StreamRequestMessage.makeStreamRequestMessage(new StreamRequestMessage(streamRequestMetadata));
+ MessagingService.instance.sendOneWay(message, source);
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Thu Feb 4 15:21:31 2010
@@ -63,7 +63,7 @@
/**
* Split out files for all tables on disk locally for each range and then stream them to the target endpoint.
*/
- public static void transferRanges(InetAddress target, Collection<Range> ranges, Runnable callback)
+ public static void transferRanges(InetAddress target, String tableName, Collection<Range> ranges, Runnable callback)
{
assert ranges.size() > 0;
@@ -75,36 +75,34 @@
* (2) anticompaction -- split out the keys in the range specified
* (3) transfer the data.
*/
- for (Table table : Table.all())
+ try
{
- try
+ Table table = Table.open(tableName);
+ if (logger.isDebugEnabled())
+ logger.debug("Flushing memtables ...");
+ for (Future f : table.flush())
{
- if (logger.isDebugEnabled())
- logger.debug("Flushing memtables ...");
- for (Future f : table.flush())
+ try
{
- try
- {
- f.get();
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException(e);
- }
+ f.get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
}
- if (logger.isDebugEnabled())
- logger.debug("Performing anticompaction ...");
- /* Get the list of files that need to be streamed */
- transferSSTables(target, table.forceAntiCompaction(ranges, target), table.name); // SSTR GC deletes the file when done
- }
- catch (IOException e)
- {
- throw new IOError(e);
}
+ if (logger.isDebugEnabled())
+ logger.debug("Performing anticompaction ...");
+ /* Get the list of files that need to be streamed */
+ transferSSTables(target, table.forceAntiCompaction(ranges, target), tableName); // SSTR GC deletes the file when done
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
}
if (callback != null)
callback.run();
@@ -127,7 +125,7 @@
}
}
if (logger.isDebugEnabled())
- logger.debug("Stream context metadata " + StringUtils.join(pendingFiles, ", "));
+ logger.debug("Stream context metadata " + StringUtils.join(pendingFiles, ", " + " " + sstables.size() + " sstables."));
StreamOutManager.get(target).addFilesToStream(pendingFiles);
StreamInitiateMessage biMessage = new StreamInitiateMessage(pendingFiles);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java Thu Feb 4 15:21:31 2010
@@ -1,80 +1,86 @@
-package org.apache.cassandra.streaming;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.CompactEndPointSerializationHelper;
-
-/**
- * This encapsulates information of the list of ranges that a target
- * node requires to be transferred. This will be bundled in a
- * StreamRequestsMessage and sent to nodes that are going to handoff
- * the data.
-*/
-class StreamRequestMetadata
-{
- private static ICompactSerializer<StreamRequestMetadata> serializer_;
- static
- {
- serializer_ = new StreamRequestMetadataSerializer();
- }
-
- protected static ICompactSerializer<StreamRequestMetadata> serializer()
- {
- return serializer_;
- }
-
- protected InetAddress target_;
- protected Collection<Range> ranges_;
-
- StreamRequestMetadata(InetAddress target, Collection<Range> ranges)
- {
- target_ = target;
- ranges_ = ranges;
- }
-
- public String toString()
- {
- StringBuilder sb = new StringBuilder("");
- sb.append(target_);
- sb.append("------->");
- for ( Range range : ranges_ )
- {
- sb.append(range);
- sb.append(" ");
- }
- return sb.toString();
- }
-
- private static class StreamRequestMetadataSerializer implements ICompactSerializer<StreamRequestMetadata>
- {
- public void serialize(StreamRequestMetadata srMetadata, DataOutputStream dos) throws IOException
- {
- CompactEndPointSerializationHelper.serialize(srMetadata.target_, dos);
- dos.writeInt(srMetadata.ranges_.size());
- for (Range range : srMetadata.ranges_)
- {
- Range.serializer().serialize(range, dos);
- }
- }
-
- public StreamRequestMetadata deserialize(DataInputStream dis) throws IOException
- {
- InetAddress target = CompactEndPointSerializationHelper.deserialize(dis);
- int size = dis.readInt();
- List<Range> ranges = (size == 0) ? null : new ArrayList<Range>();
- for( int i = 0; i < size; ++i )
- {
- ranges.add(Range.serializer().deserialize(dis));
- }
- return new StreamRequestMetadata( target, ranges );
- }
- }
-}
+package org.apache.cassandra.streaming;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+
+/**
+ * This encapsulates information of the list of ranges that a target
+ * node requires to be transferred. This will be bundled in a
+ * StreamRequestsMessage and sent to nodes that are going to handoff
+ * the data.
+*/
+class StreamRequestMetadata
+{
+ private static ICompactSerializer<StreamRequestMetadata> serializer_;
+ static
+ {
+ serializer_ = new StreamRequestMetadataSerializer();
+ }
+
+ protected static ICompactSerializer<StreamRequestMetadata> serializer()
+ {
+ return serializer_;
+ }
+
+ protected InetAddress target_;
+ protected Collection<Range> ranges_;
+ protected String table_;
+
+ StreamRequestMetadata(InetAddress target, Collection<Range> ranges, String table)
+ {
+ target_ = target;
+ ranges_ = ranges;
+ table_ = table;
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder("");
+ sb.append(table_);
+ sb.append("@");
+ sb.append(target_);
+ sb.append("------->");
+ for ( Range range : ranges_ )
+ {
+ sb.append(range);
+ sb.append(" ");
+ }
+ return sb.toString();
+ }
+}
+
+class StreamRequestMetadataSerializer implements ICompactSerializer<StreamRequestMetadata>
+{
+ public void serialize(StreamRequestMetadata srMetadata, DataOutputStream dos) throws IOException
+ {
+ CompactEndPointSerializationHelper.serialize(srMetadata.target_, dos);
+ dos.writeUTF(srMetadata.table_);
+ dos.writeInt(srMetadata.ranges_.size());
+ for (Range range : srMetadata.ranges_)
+ {
+ Range.serializer().serialize(range, dos);
+ }
+ }
+
+ public StreamRequestMetadata deserialize(DataInputStream dis) throws IOException
+ {
+ InetAddress target = CompactEndPointSerializationHelper.deserialize(dis);
+ String table = dis.readUTF();
+ int size = dis.readInt();
+ List<Range> ranges = (size == 0) ? null : new ArrayList<Range>();
+ for( int i = 0; i < size; ++i )
+ {
+ ranges.add(Range.serializer().deserialize(dis));
+ }
+ return new StreamRequestMetadata(target, ranges, table);
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java Thu Feb 4 15:21:31 2010
@@ -52,7 +52,7 @@
{
if (logger_.isDebugEnabled())
logger_.debug(srm.toString());
- StreamOut.transferRanges(srm.target_, srm.ranges_, null);
+ StreamOut.transferRanges(srm.target_, srm.table_, srm.ranges_, null);
}
}
catch (IOException ex)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/ClusterCmd.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/ClusterCmd.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/ClusterCmd.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/ClusterCmd.java Thu Feb 4 15:21:31 2010
@@ -144,9 +144,9 @@
hf.printHelp(usage, "", options, header);
}
- public void printEndPoints(String key)
+ public void printEndPoints(String key, String table)
{
- List<InetAddress> endpoints = probe.getEndPoints(key);
+ List<InetAddress> endpoints = probe.getEndPoints(key, table);
System.out.println(String.format("%-17s: %s", "Key", key));
System.out.println(String.format("%-17s: %s", "Endpoints", endpoints));
}
@@ -254,21 +254,21 @@
String cmdName = arguments[0];
if (cmdName.equals("get_endpoints"))
{
- if (arguments.length <= 1)
+ if (arguments.length <= 2)
{
- System.err.println("missing key argument");
+ System.err.println("missing key and/or table argument");
}
- clusterCmd.printEndPoints(arguments[1]);
+ clusterCmd.printEndPoints(arguments[1], arguments[2]);
}
- else if (cmdName.equals("global_snapshot"))
- {
+ else if (cmdName.equals("global_snapshot"))
+ {
String snapshotName = "";
if (arguments.length > 1)
{
snapshotName = arguments[1];
}
- clusterCmd.takeGlobalSnapshot(snapshotName);
- }
+ clusterCmd.takeGlobalSnapshot(snapshotName);
+ }
else if (cmdName.equals("clear_global_snapshot"))
{
clusterCmd.clearGlobalSnapshot();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Thu Feb 4 15:21:31 2010
@@ -66,7 +66,7 @@
*/
public void printRing(PrintStream outs)
{
- Map<Range, List<String>> rangeMap = probe.getRangeToEndPointMap();
+ Map<Range, List<String>> rangeMap = probe.getRangeToEndPointMap(null);
List<Range> ranges = new ArrayList<Range>(rangeMap.keySet());
Collections.sort(ranges);
Set<String> liveNodes = probe.getLiveNodes();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Thu Feb 4 15:21:31 2010
@@ -26,6 +26,8 @@
import java.lang.management.RuntimeMXBean;
import java.net.InetAddress;
import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -139,15 +141,84 @@
ssProxy.forceTableRepair(tableName, columnFamilies);
}
- public Map<Range, List<String>> getRangeToEndPointMap()
+ public Map<Range, List<String>> getRangeToEndPointMap(String tableName)
{
- return ssProxy.getRangeToEndPointMap();
+ return ssProxy.getRangeToEndPointMap(tableName);
}
public Set<String> getLiveNodes()
{
return ssProxy.getLiveNodes();
}
+
+ /**
+ * Write a textual representation of the Cassandra ring.
+ *
+ * @param outs the stream to write to
+ */
+ public void printRing(PrintStream outs)
+ {
+ Map<Range, List<String>> rangeMap = ssProxy.getRangeToEndPointMap(null);
+ List<Range> ranges = new ArrayList<Range>(rangeMap.keySet());
+ Collections.sort(ranges);
+ Set<String> liveNodes = ssProxy.getLiveNodes();
+ Set<String> deadNodes = ssProxy.getUnreachableNodes();
+ Map<String, String> loadMap = ssProxy.getLoadMap();
+
+ // Print range-to-endpoint mapping
+ int counter = 0;
+ outs.print(String.format("%-14s", "Address"));
+ outs.print(String.format("%-11s", "Status"));
+ outs.print(String.format("%-14s", "Load"));
+ outs.print(String.format("%-43s", "Range"));
+ outs.println("Ring");
+ // emphasize that we're showing the right part of each range
+ if (ranges.size() > 1)
+ {
+ outs.println(String.format("%-14s%-11s%-14s%-43s", "", "", "", ranges.get(0).left));
+ }
+ // normal range & node info
+ for (Range range : ranges) {
+ List<String> endpoints = rangeMap.get(range);
+ String primaryEndpoint = endpoints.get(0);
+
+ outs.print(String.format("%-14s", primaryEndpoint));
+
+ String status = liveNodes.contains(primaryEndpoint)
+ ? "Up"
+ : deadNodes.contains(primaryEndpoint)
+ ? "Down"
+ : "?";
+ outs.print(String.format("%-11s", status));
+
+ String load = loadMap.containsKey(primaryEndpoint) ? loadMap.get(primaryEndpoint) : "?";
+ outs.print(String.format("%-14s", load));
+
+ outs.print(String.format("%-43s", range.right));
+
+ String asciiRingArt;
+ if (counter == 0)
+ {
+ asciiRingArt = "|<--|";
+ }
+ else if (counter == (rangeMap.size() - 1))
+ {
+ asciiRingArt = "|-->|";
+ }
+ else
+ {
+ if ((rangeMap.size() > 4) && ((counter % 2) == 0))
+ asciiRingArt = "v |";
+ else if ((rangeMap.size() > 4) && ((counter % 2) != 0))
+ asciiRingArt = "| ^";
+ else
+ asciiRingArt = "| |";
+ }
+ outs.println(asciiRingArt);
+
+ counter++;
+ }
+ }
public Set<String> getUnreachableNodes()
{
@@ -325,9 +396,9 @@
}
}
- public List<InetAddress> getEndPoints(String key)
+ public List<InetAddress> getEndPoints(String key, String table)
{
- return ssProxy.getNaturalEndpoints(key);
+ return ssProxy.getNaturalEndpoints(key, table);
}
}
Modified: incubator/cassandra/trunk/test/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/conf/storage-conf.xml?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/test/conf/storage-conf.xml Thu Feb 4 15:21:31 2010
@@ -23,9 +23,6 @@
<CommitLogSync>batch</CommitLogSync>
<CommitLogSyncBatchWindowInMS>1.0</CommitLogSyncBatchWindowInMS>
<Partitioner>org.apache.cassandra.dht.CollatingOrderPreservingPartitioner</Partitioner>
- <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
- <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
- <ReplicationFactor>1</ReplicationFactor>
<RpcTimeoutInMillis>5000</RpcTimeoutInMillis>
<ListenAddress>127.0.0.1</ListenAddress>
<StoragePort>7010</StoragePort>
@@ -51,12 +48,33 @@
<ColumnFamily ColumnType="Super" CompareSubcolumnsWith="LongType" Name="Super2"/>
<ColumnFamily ColumnType="Super" CompareSubcolumnsWith="LongType" Name="Super3"/>
<ColumnFamily ColumnType="Super" CompareSubcolumnsWith="UTF8Type" Name="Super4"/>
+ <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
+ <ReplicationFactor>1</ReplicationFactor>
+ <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
</Keyspace>
<Keyspace Name = "Keyspace2">
<ColumnFamily Name="Standard1"/>
<ColumnFamily Name="Standard3"/>
<ColumnFamily ColumnType="Super" Name="Super3"/>
<ColumnFamily ColumnType="Super" CompareSubcolumnsWith="TimeUUIDType" Name="Super4"/>
+ <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
+ <ReplicationFactor>1</ReplicationFactor>
+ <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
+ </Keyspace>
+ <Keyspace Name = "Keyspace3">
+ <ColumnFamily Name="Standard1"/>
+ <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
+ <ReplicationFactor>5</ReplicationFactor>
+ <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
+ </Keyspace>
+ <Keyspace Name = "Keyspace4">
+ <ColumnFamily Name="Standard1"/>
+ <ColumnFamily Name="Standard3"/>
+ <ColumnFamily ColumnType="Super" Name="Super3"/>
+ <ColumnFamily ColumnType="Super" CompareSubcolumnsWith="TimeUUIDType" Name="Super4"/>
+ <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
+ <ReplicationFactor>3</ReplicationFactor>
+ <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
</Keyspace>
</Keyspaces>
<Seeds>
Modified: incubator/cassandra/trunk/test/system/test_server.py
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/system/test_server.py?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/system/test_server.py (original)
+++ incubator/cassandra/trunk/test/system/test_server.py Thu Feb 4 15:21:31 2010
@@ -873,7 +873,7 @@
def test_describe_keyspace(self):
""" Test keyspace description """
kspaces = client.get_string_list_property("keyspaces")
- assert len(kspaces) == 3, kspaces
+ assert len(kspaces) == 5, kspaces # ['Keyspace1', 'Keyspace2', 'Keyspace3', 'Keyspace4', 'system']
ks1 = client.describe_keyspace("Keyspace1")
assert set(ks1.keys()) == set(['Super1', 'Standard1', 'Standard2', 'StandardLong1', 'StandardLong2', 'Super3', 'Super2', 'Super4'])
sysks = client.describe_keyspace("system")
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java Thu Feb 4 15:21:31 2010
@@ -59,30 +59,51 @@
}
/**
- * usage: java -Dstorage-config="confpath" org.apache.cassandra.client.TestRingCache
+ * usage: java -Dstorage-config="confpath" org.apache.cassandra.client.TestRingCache [keyspace row-id-prefix row-id-int]
+ * to test a single keyspace/row, use the parameters. row-id-prefix and row-id-int are appended together to form a
+ * single row id. If you supply now parameters, 'Keyspace1' is assumed and will check 9 rows ('row1' through 'row9').
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Throwable
{
- String table = "Keyspace1";
- for (int nRows=1; nRows<10; nRows++)
+ String table;
+ int minRow;
+ int maxRow;
+ String rowPrefix;
+ if (args.length > 0)
{
- String row = "row" + nRows;
+ table = args[0];
+ rowPrefix = args[1];
+ minRow = Integer.parseInt(args[2]);
+ maxRow = minRow + 1;
+ }
+ else
+ {
+ table = "Keyspace1";
+ minRow = 1;
+ maxRow = 10;
+ rowPrefix = "row";
+ }
+
+ for (int nRows = minRow; nRows < maxRow; nRows++)
+ {
+ String row = rowPrefix + nRows;
ColumnPath col = createColumnPath("Standard1", null, "col1".getBytes());
- List<InetAddress> endPoints = ringCache.getEndPoint(row);
+ List<InetAddress> endPoints = ringCache.getEndPoint(table, row);
String hosts="";
for (int i = 0; i < endPoints.size(); i++)
hosts = hosts + ((i > 0) ? "," : "") + endPoints.get(i);
System.out.println("hosts with key " + row + " : " + hosts + "; choose " + endPoints.get(0));
-
+
// now, read the row back directly from the host owning the row locally
setup(endPoints.get(0).getHostAddress(), DatabaseDescriptor.getThriftPort());
thriftClient.insert(table, row, col, "val1".getBytes(), 1, ConsistencyLevel.ONE);
Column column=thriftClient.get(table, row, col, ConsistencyLevel.ONE).column;
System.out.println("read row " + row + " " + new String(column.name) + ":" + new String(column.value) + ":" + column.timestamp);
}
+
System.exit(1);
}
}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java Thu Feb 4 15:21:31 2010
@@ -34,8 +34,8 @@
* TODO: A more general method of property modification would be useful, but
* will probably have to wait for a refactor away from all the static fields.
*/
- public static void setReplicationFactor(int factor)
+ public static void setReplicationFactor(String table, int factor)
{
- DatabaseDescriptor.setReplicationFactorUnsafe(factor);
+ DatabaseDescriptor.setReplicationFactorUnsafe(table, factor);
}
}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Thu Feb 4 15:21:31 2010
@@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.Map;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.commons.lang.StringUtils;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
@@ -71,12 +72,17 @@
@Test
public void testSourceTargetComputation() throws UnknownHostException
{
- testSourceTargetComputation(1);
- testSourceTargetComputation(3);
- testSourceTargetComputation(100);
+ final int[] clusterSizes = new int[] { 1, 3, 5, 10, 100};
+ for (String table : DatabaseDescriptor.getNonSystemTables())
+ {
+ int replicationFactor = DatabaseDescriptor.getReplicationFactor(table);
+ for (int clusterSize : clusterSizes)
+ if (clusterSize >= replicationFactor)
+ testSourceTargetComputation(table, clusterSize, replicationFactor);
+ }
}
- private void testSourceTargetComputation(int numOldNodes) throws UnknownHostException
+ private void testSourceTargetComputation(String table, int numOldNodes, int replicationFactor) throws UnknownHostException
{
StorageService ss = StorageService.instance;
@@ -86,8 +92,8 @@
TokenMetadata tmd = ss.getTokenMetadata();
assertEquals(numOldNodes, tmd.sortedTokens().size());
- BootStrapper b = new BootStrapper(ss.getReplicationStrategy(), myEndpoint, myToken, tmd);
- Multimap<Range, InetAddress> res = b.getRangesWithSources();
+ BootStrapper b = new BootStrapper(myEndpoint, myToken, tmd);
+ Multimap<Range, InetAddress> res = b.getRangesWithSources(table);
int transferCount = 0;
for (Map.Entry<Range, Collection<InetAddress>> e : res.asMap().entrySet())
@@ -96,8 +102,7 @@
transferCount++;
}
- /* Only 1 transfer from old node to new node */
- assertEquals(1, transferCount);
+ assertEquals(replicationFactor, transferCount);
IFailureDetector mockFailureDetector = new IFailureDetector()
{
public boolean isAlive(InetAddress ep)
@@ -112,8 +117,10 @@
public void remove(InetAddress ep) { throw new UnsupportedOperationException(); }
};
Multimap<InetAddress, Range> temp = BootStrapper.getWorkMap(res, mockFailureDetector);
- assertEquals(1, temp.keySet().size());
- assertEquals(1, temp.asMap().values().iterator().next().size());
+ // there isn't any point in testing the size of these collections for any specific size. When a random partitioner
+ // is used, they will vary.
+ assert temp.keySet().size() > 0;
+ assert temp.asMap().values().iterator().next().size() > 0;
assert !temp.keySet().iterator().next().equals(myEndpoint);
}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java Thu Feb 4 15:21:31 2010
@@ -25,6 +25,9 @@
import java.util.ArrayList;
import java.util.Collection;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.service.StorageServiceAccessor;
import org.junit.Test;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner;
@@ -40,10 +43,26 @@
public class RackUnawareStrategyTest
{
@Test
+ public void tryBogusTable()
+ {
+ AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy("Keyspace1");
+ assertNotNull(rs);
+ try
+ {
+ rs = StorageService.instance.getReplicationStrategy("SomeBogusTableThatDoesntExist");
+ throw new AssertionError("SS.getReplicationStrategy() should have thrown a RuntimeException.");
+ }
+ catch (RuntimeException ex)
+ {
+ // This exception should be thrown.
+ }
+ }
+
+ @Test
public void testBigIntegerEndpoints() throws UnknownHostException
{
TokenMetadata tmd = new TokenMetadata();
- AbstractReplicationStrategy strategy = new RackUnawareStrategy(tmd, 3);
+ AbstractReplicationStrategy strategy = new RackUnawareStrategy(tmd, null);
List<Token> endPointTokens = new ArrayList<Token>();
List<Token> keyTokens = new ArrayList<Token>();
@@ -51,7 +70,8 @@
endPointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
keyTokens.add(new BigIntegerToken(String.valueOf(10 * i + 5)));
}
- testGetEndpoints(tmd, strategy, endPointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0]));
+ for (String table : DatabaseDescriptor.getNonSystemTables())
+ testGetEndpoints(tmd, strategy, endPointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0]), table);
}
@Test
@@ -59,7 +79,7 @@
{
TokenMetadata tmd = new TokenMetadata();
IPartitioner partitioner = new OrderPreservingPartitioner();
- AbstractReplicationStrategy strategy = new RackUnawareStrategy(tmd, 3);
+ AbstractReplicationStrategy strategy = new RackUnawareStrategy(tmd, null);
List<Token> endPointTokens = new ArrayList<Token>();
List<Token> keyTokens = new ArrayList<Token>();
@@ -67,12 +87,13 @@
endPointTokens.add(new StringToken(String.valueOf((char)('a' + i * 2))));
keyTokens.add(partitioner.getToken(String.valueOf((char)('a' + i * 2 + 1))));
}
- testGetEndpoints(tmd, strategy, endPointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0]));
+ for (String table : DatabaseDescriptor.getNonSystemTables())
+ testGetEndpoints(tmd, strategy, endPointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0]), table);
}
// given a list of endpoint tokens, and a set of key tokens falling between the endpoint tokens,
// make sure that the Strategy picks the right endpoints for the keys.
- private void testGetEndpoints(TokenMetadata tmd, AbstractReplicationStrategy strategy, Token[] endPointTokens, Token[] keyTokens) throws UnknownHostException
+ private void testGetEndpoints(TokenMetadata tmd, AbstractReplicationStrategy strategy, Token[] endPointTokens, Token[] keyTokens, String table) throws UnknownHostException
{
List<InetAddress> hosts = new ArrayList<InetAddress>();
for (int i = 0; i < endPointTokens.length; i++)
@@ -84,8 +105,8 @@
for (int i = 0; i < keyTokens.length; i++)
{
- List<InetAddress> endPoints = strategy.getNaturalEndpoints(keyTokens[i]);
- assertEquals(3, endPoints.size());
+ List<InetAddress> endPoints = strategy.getNaturalEndpoints(keyTokens[i], table);
+ assertEquals(DatabaseDescriptor.getReplicationFactor(table), endPoints.size());
for (int j = 0; j < endPoints.size(); j++)
{
assertEquals(endPoints.get(j), hosts.get((i + j + 1) % hosts.size()));
@@ -96,16 +117,19 @@
@Test
public void testGetEndpointsDuringBootstrap() throws UnknownHostException
{
+ // the token difference will be RING_SIZE * 2.
+ final int RING_SIZE = 10;
TokenMetadata tmd = new TokenMetadata();
- AbstractReplicationStrategy strategy = new RackUnawareStrategy(tmd, 3);
+ TokenMetadata oldTmd = StorageServiceAccessor.setTokenMetadata(tmd);
+ AbstractReplicationStrategy strategy = new RackUnawareStrategy(tmd, null);
- Token[] endPointTokens = new Token[5];
- Token[] keyTokens = new Token[5];
+ Token[] endPointTokens = new Token[RING_SIZE];
+ Token[] keyTokens = new Token[RING_SIZE];
- for (int i = 0; i < 5; i++)
+ for (int i = 0; i < RING_SIZE; i++)
{
- endPointTokens[i] = new BigIntegerToken(String.valueOf(10 * i));
- keyTokens[i] = new BigIntegerToken(String.valueOf(10 * i + 5));
+ endPointTokens[i] = new BigIntegerToken(String.valueOf(RING_SIZE * 2 * i));
+ keyTokens[i] = new BigIntegerToken(String.valueOf(RING_SIZE * 2 * i + RING_SIZE));
}
List<InetAddress> hosts = new ArrayList<InetAddress>();
@@ -115,28 +139,36 @@
tmd.updateNormalToken(endPointTokens[i], ep);
hosts.add(ep);
}
-
- //Add bootstrap node id=6
- Token bsToken = new BigIntegerToken(String.valueOf(25));
- InetAddress bootstrapEndPoint = InetAddress.getByName("127.0.0.6");
+
+ // bootstrap at the end of the ring
+ Token bsToken = new BigIntegerToken(String.valueOf(210));
+ InetAddress bootstrapEndPoint = InetAddress.getByName("127.0.0.11");
tmd.addBootstrapToken(bsToken, bootstrapEndPoint);
- StorageService.calculatePendingRanges(tmd, strategy);
- for (int i = 0; i < keyTokens.length; i++)
+ for (String table : DatabaseDescriptor.getNonSystemTables())
{
- Collection<InetAddress> endPoints = strategy.getWriteEndpoints(keyTokens[i], strategy.getNaturalEndpoints(keyTokens[i]));
- assertTrue(endPoints.size() >= 3);
+ StorageService.calculatePendingRanges(strategy, table);
+ int replicationFactor = DatabaseDescriptor.getReplicationFactor(table);
- for (int j = 0; j < 3; j++)
+ for (int i = 0; i < keyTokens.length; i++)
{
- //Check that the old nodes are definitely included
- assertTrue(endPoints.contains(hosts.get((i + j + 1) % hosts.size())));
+ Collection<InetAddress> endPoints = strategy.getWriteEndpoints(keyTokens[i], table, strategy.getNaturalEndpoints(keyTokens[i], table));
+ assertTrue(endPoints.size() >= replicationFactor);
+
+ for (int j = 0; j < replicationFactor; j++)
+ {
+ //Check that the old nodes are definitely included
+ assertTrue(endPoints.contains(hosts.get((i + j + 1) % hosts.size())));
+ }
+
+ // bootstrapEndPoint should be in the endPoints for i in MAX-RF to MAX, but not in any earlier ep.
+ if (i < RING_SIZE - replicationFactor)
+ assertFalse(endPoints.contains(bootstrapEndPoint));
+ else
+ assertTrue(endPoints.contains(bootstrapEndPoint));
}
- // for 5, 15, 25 this should include bootstrap node
- if (i < 3)
- assertTrue(endPoints.contains(bootstrapEndPoint));
- else
- assertFalse(endPoints.contains(bootstrapEndPoint));
}
+
+ StorageServiceAccessor.setTokenMetadata(oldTmd);
}
}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Thu Feb 4 15:21:31 2010
@@ -61,8 +61,9 @@
if (!initialized)
{
LOCAL = FBUtilities.getLocalAddress();
+ tablename = DatabaseDescriptor.getTables().iterator().next();
// bump the replication factor so that local overlaps with REMOTE below
- DatabaseDescriptorTest.setReplicationFactor(2);
+ DatabaseDescriptorTest.setReplicationFactor(tablename, 2);
StorageService.instance.initServer();
// generate a fake endpoint for which we can spoof receiving/sending trees
@@ -72,7 +73,6 @@
tmd.updateNormalToken(part.getMinimumToken(), REMOTE);
assert tmd.isMember(REMOTE);
- tablename = DatabaseDescriptor.getTables().iterator().next();
cfname = Table.open(tablename).getColumnFamilies().iterator().next();
initialized = true;
}
@@ -89,7 +89,7 @@
@Test
public void testGetValidator() throws Throwable
{
- aes.clearNaturalRepairs();
+ aes.clearNaturalRepairs_TestsOnly();
// not major
assert aes.getValidator(tablename, cfname, null, false) instanceof NoopValidator;
@@ -174,13 +174,13 @@
Util.writeColumnFamily(rms);
ColumnFamilyStore store = Util.writeColumnFamily(rms);
- TreePair old = aes.getRendezvousPair(tablename, cfname, REMOTE);
+ TreePair old = aes.getRendezvousPair_TestsOnly(tablename, cfname, REMOTE);
// force a readonly compaction, and wait for it to finish
CompactionManager.instance.submitReadonly(store, REMOTE).get(5000, TimeUnit.MILLISECONDS);
// check that a tree was created and stored
flushAES().get(5000, TimeUnit.MILLISECONDS);
- assert old != aes.getRendezvousPair(tablename, cfname, REMOTE);
+ assert old != aes.getRendezvousPair_TestsOnly(tablename, cfname, REMOTE);
}
@Test
@@ -200,7 +200,7 @@
// confirm that our reference is not equal to the original due
// to (de)serialization
- assert tree != aes.getRendezvousPair(tablename, cfname, REMOTE).left;
+ assert tree != aes.getRendezvousPair_TestsOnly(tablename, cfname, REMOTE).left;
}
@Test