You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/07/30 19:00:29 UTC
[12/17] git commit: Remove shuffle/taketoken
Remove shuffle/taketoken
Patch by brandonwilliams, reviewed by thobbs for CASSANDRA-7601
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc3e0dbd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc3e0dbd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc3e0dbd
Branch: refs/heads/trunk
Commit: cc3e0dbda284b814e492c620b7875f2b75c03e91
Parents: 38c27b0
Author: Brandon Williams <br...@apache.org>
Authored: Wed Jul 30 11:53:34 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Jul 30 11:53:34 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 3 +
debian/cassandra.install | 1 -
.../apache/cassandra/gms/VersionedValue.java | 7 -
.../apache/cassandra/locator/TokenMetadata.java | 102 +--
.../service/PendingRangeCalculatorService.java | 16 +-
.../ScheduledRangeTransferExecutorService.java | 136 ----
.../cassandra/service/StorageService.java | 149 +---
.../cassandra/service/StorageServiceMBean.java | 10 -
.../org/apache/cassandra/tools/NodeProbe.java | 5 -
.../org/apache/cassandra/tools/NodeTool.java | 21 -
.../org/apache/cassandra/tools/Shuffle.java | 722 -------------------
.../apache/cassandra/service/RelocateTest.java | 205 ------
13 files changed, 7 insertions(+), 1371 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc3e0dbd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7d7caf9..c4772a7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.0-final
+ * Remove shuffle and taketoken (CASSANDRA-7601)
* Clean up Windows batch scripts (CASSANDRA-7619)
* Fix native protocol drop user type notification (CASSANDRA-7571)
* Give read access to system.schema_usertypes to all authenticated users
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc3e0dbd/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index c35bc11..3cbe3d1 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -78,6 +78,9 @@ New features
- If you are using Leveled Compaction, you can now disable doing size-tiered
compaction in L0 by starting Cassandra with -Dcassandra.disable_stcs_in_l0
(see CASSANDRA-6621 for details).
+ - Shuffle and taketoken have been removed. For clusters that choose to
+ upgrade to vnodes, creating a new datacenter with vnodes and migrating is
+ recommended. See http://goo.gl/Sna2S1 for further information.
2.0.9
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc3e0dbd/debian/cassandra.install
----------------------------------------------------------------------
diff --git a/debian/cassandra.install b/debian/cassandra.install
index 5b6a81b..a4654d1 100644
--- a/debian/cassandra.install
+++ b/debian/cassandra.install
@@ -18,7 +18,6 @@ bin/sstableloader usr/bin
bin/cqlsh usr/bin
bin/sstablescrub usr/bin
bin/sstableupgrade usr/bin
-bin/cassandra-shuffle usr/bin
tools/bin/cassandra-stress usr/bin
lib/*.jar usr/share/cassandra/lib
lib/*.zip usr/share/cassandra/lib
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc3e0dbd/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index 206a52b..36c2f00 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -65,7 +65,6 @@ public class VersionedValue implements Comparable<VersionedValue>
public final static String STATUS_LEAVING = "LEAVING";
public final static String STATUS_LEFT = "LEFT";
public final static String STATUS_MOVING = "MOVING";
- public final static String STATUS_RELOCATING = "RELOCATING";
public final static String REMOVING_TOKEN = "removing";
public final static String REMOVED_TOKEN = "removed";
@@ -169,12 +168,6 @@ public class VersionedValue implements Comparable<VersionedValue>
return new VersionedValue(VersionedValue.STATUS_MOVING + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
}
- public VersionedValue relocating(Collection<Token> srcTokens)
- {
- return new VersionedValue(
- versionString(VersionedValue.STATUS_RELOCATING, StringUtils.join(srcTokens, VersionedValue.DELIMITER)));
- }
-
public VersionedValue hostId(UUID hostId)
{
return new VersionedValue(hostId.toString());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc3e0dbd/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index f41b1e7..f848e3b 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -87,9 +87,6 @@ public class TokenMetadata
// nodes which are migrating to the new tokens in the ring
private final Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<Pair<Token, InetAddress>>();
- // tokens which are migrating to new endpoints
- private final ConcurrentMap<Token, InetAddress> relocatingTokens = new ConcurrentHashMap<Token, InetAddress>();
-
/* Use this lock for manipulating the token map */
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
private volatile ArrayList<Token> sortedTokens;
@@ -388,33 +385,6 @@ public class TokenMetadata
}
}
- /**
- * Add new relocating ranges (tokens moving from their respective endpoints, to another).
- * @param tokens tokens being moved
- * @param endpoint destination of moves
- */
- public void addRelocatingTokens(Collection<Token> tokens, InetAddress endpoint)
- {
- assert endpoint != null;
- assert tokens != null && tokens.size() > 0;
-
- lock.writeLock().lock();
-
- try
- {
- for (Token token : tokens)
- {
- InetAddress prev = relocatingTokens.put(token, endpoint);
- if (prev != null && !prev.equals(endpoint))
- logger.warn("Relocation of {} to {} overwrites previous to {}", new Object[]{token, endpoint, prev});
- }
- }
- finally
- {
- lock.writeLock().unlock();
- }
- }
-
public void removeEndpoint(InetAddress endpoint)
{
assert endpoint != null;
@@ -464,38 +434,6 @@ public class TokenMetadata
}
}
- /**
- * Remove pair of token/address from relocating ranges.
- * @param endpoint
- */
- public void removeFromRelocating(Token token, InetAddress endpoint)
- {
- assert endpoint != null;
- assert token != null;
-
- lock.writeLock().lock();
-
- try
- {
- InetAddress previous = relocatingTokens.remove(token);
-
- if (previous == null)
- {
- logger.debug("Cannot remove {}, not found among the relocating (previously removed?)", token);
- }
- else if (!previous.equals(endpoint))
- {
- logger.warn(
- "Removal of relocating token {} with mismatched endpoint ({} != {})",
- new Object[]{token, endpoint, previous});
- }
- }
- finally
- {
- lock.writeLock().unlock();
- }
- }
-
public Collection<Token> getTokens(InetAddress endpoint)
{
assert endpoint != null;
@@ -570,22 +508,6 @@ public class TokenMetadata
}
}
- public boolean isRelocating(Token token)
- {
- assert token != null;
-
- lock.readLock().lock();
-
- try
- {
- return relocatingTokens.containsKey(token);
- }
- finally
- {
- lock.readLock().unlock();
- }
- }
-
private final AtomicReference<TokenMetadata> cachedTokenMap = new AtomicReference<TokenMetadata>();
/**
@@ -658,7 +580,7 @@ public class TokenMetadata
/**
* Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all
- * current leave, move, and relocate operations have finished.
+ * current leave, and move operations have finished.
*
* @return new token metadata
*/
@@ -677,9 +599,6 @@ public class TokenMetadata
for (Pair<Token, InetAddress> pair : movingEndpoints)
metadata.updateNormalToken(pair.left, pair.right);
- for (Map.Entry<Token, InetAddress> relocating: relocatingTokens.entrySet())
- metadata.updateNormalToken(relocating.getKey(), relocating.getValue());
-
return metadata;
}
finally
@@ -831,15 +750,6 @@ public class TokenMetadata
}
}
- /**
- * Ranges which are migrating to new endpoints.
- * @return set of token-address pairs of relocating ranges
- */
- public Map<Token, InetAddress> getRelocatingRanges()
- {
- return relocatingTokens;
- }
-
public static int firstTokenIndex(final ArrayList ring, Token start, boolean insertMin)
{
assert ring.size() > 0;
@@ -994,16 +904,6 @@ public class TokenMetadata
return sb.toString();
}
- public String printRelocatingRanges()
- {
- StringBuilder sb = new StringBuilder();
-
- for (Map.Entry<Token, InetAddress> entry : relocatingTokens.entrySet())
- sb.append(String.format("%s:%s%n", entry.getKey(), entry.getValue()));
-
- return sb.toString();
- }
-
public Collection<InetAddress> pendingEndpointsFor(Token token, String keyspaceName)
{
Map<Range<Token>, Collection<InetAddress>> ranges = getPendingRanges(keyspaceName);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc3e0dbd/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index abf8df2..063633f 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -122,7 +122,7 @@ public class PendingRangeCalculatorService
BiMultiValMap<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens();
Set<InetAddress> leavingEndpoints = tm.getLeavingEndpoints();
- if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && tm.getMovingEndpoints().isEmpty() && tm.getRelocatingRanges().isEmpty())
+ if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && tm.getMovingEndpoints().isEmpty())
{
if (logger.isDebugEnabled())
logger.debug("No bootstrapping, leaving or moving nodes, and no relocating tokens -> empty pending ranges for {}", keyspaceName);
@@ -185,20 +185,6 @@ public class PendingRangeCalculatorService
allLeftMetadata.removeEndpoint(endpoint);
}
- // Ranges being relocated.
- for (Map.Entry<Token, InetAddress> relocating : tm.getRelocatingRanges().entrySet())
- {
- InetAddress endpoint = relocating.getValue(); // address of the moving node
- Token token = relocating.getKey();
-
- allLeftMetadata.updateNormalToken(token, endpoint);
-
- for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
- pendingRanges.put(range, endpoint);
-
- allLeftMetadata.removeEndpoint(endpoint);
- }
-
tm.setPendingRanges(keyspaceName, pendingRanges);
if (logger.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc3e0dbd/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java b/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
deleted file mode 100644
index 860619a..0000000
--- a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Date;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ScheduledRangeTransferExecutorService
-{
- private static final Logger LOG = LoggerFactory.getLogger(ScheduledRangeTransferExecutorService.class);
- private static final int INTERVAL = 10;
- private ScheduledExecutorService scheduler;
-
- public void setup()
- {
- if (DatabaseDescriptor.getNumTokens() == 1)
- {
- LOG.warn("Cannot start range transfer scheduler: endpoint is not virtual nodes-enabled");
- return;
- }
-
- scheduler = Executors.newSingleThreadScheduledExecutor(new RangeTransferThreadFactory());
- scheduler.scheduleWithFixedDelay(new RangeTransfer(), 0, INTERVAL, TimeUnit.SECONDS);
- LOG.info("Enabling scheduled transfers of token ranges");
- }
-
- public void tearDown()
- {
- if (scheduler == null)
- {
- LOG.warn("Unable to shutdown; Scheduler never enabled");
- return;
- }
-
- LOG.info("Shutting down range transfer scheduler");
- scheduler.shutdownNow();
- }
-}
-
-class RangeTransfer implements Runnable
-{
- private static final Logger LOG = LoggerFactory.getLogger(RangeTransfer.class);
-
- public void run()
- {
- UntypedResultSet res = executeInternal("SELECT * FROM system." + SystemKeyspace.RANGE_XFERS_CF);
-
- if (res.size() < 1)
- {
- LOG.info("No queued ranges to transfer, shuffle complete. Run 'cassandra-shuffle disable' to stop this message.");
- return;
- }
-
- if (!isReady())
- return;
-
- UntypedResultSet.Row row = res.iterator().next();
-
- Date requestedAt = row.getTimestamp("requested_at");
- ByteBuffer tokenBytes = row.getBytes("token_bytes");
- Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(tokenBytes);
-
- LOG.info("Initiating transfer of {} (scheduled at {})", token, requestedAt.toString());
- try
- {
- StorageService.instance.relocateTokens(Collections.singleton(token));
- }
- catch (Exception e)
- {
- LOG.error("Error removing {}: {}", token, e);
- }
- finally
- {
- LOG.debug("Removing queued entry for transfer of {}", token);
- executeInternal(String.format("DELETE FROM system.%s WHERE token_bytes = ?", SystemKeyspace.RANGE_XFERS_CF), tokenBytes);
- }
- }
-
- private boolean isReady()
- {
- int targetTokens = DatabaseDescriptor.getNumTokens();
- int highMark = (int)Math.ceil(targetTokens + (targetTokens * .10));
- int actualTokens = StorageService.instance.getTokens().size();
-
- if (actualTokens >= highMark)
- {
- LOG.warn("Pausing until token count stabilizes (target={}, actual={})", targetTokens, actualTokens);
- return false;
- }
-
- return true;
- }
-}
-
-class RangeTransferThreadFactory implements ThreadFactory
-{
- private AtomicInteger count = new AtomicInteger(0);
-
- public Thread newThread(Runnable r)
- {
- Thread rangeXferThread = new Thread(r);
- rangeXferThread.setName(String.format("ScheduledRangeXfers:%d", count.getAndIncrement()));
- return rangeXferThread;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc3e0dbd/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 8ace958..56c9308 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -182,7 +182,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
/* the probability for tracing any particular request, 0 disables tracing and 1 enables for all */
private double tracingProbability = 0.0;
- private static enum Mode { STARTING, NORMAL, CLIENT, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED, RELOCATING }
+ private static enum Mode { STARTING, NORMAL, CLIENT, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED }
private Mode operationMode = Mode.STARTING;
/* Used for tracking drain progress */
@@ -190,8 +190,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private static final AtomicInteger nextRepairCommand = new AtomicInteger();
- private static final ScheduledRangeTransferExecutorService rangeXferExecutor = new ScheduledRangeTransferExecutorService();
-
private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<>();
private static final BackgroundActivityMonitor bgMonitor = new BackgroundActivityMonitor();
@@ -1370,8 +1368,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* Other STATUS values that may be seen (possibly anywhere in the normal progression):
* STATUS_MOVING,newtoken
* set if node is currently moving to a new token in the ring
- * STATUS_RELOCATING,srcToken,srcToken,srcToken,...
- * set if the endpoint is in the process of relocating a token to itself
* REMOVING_TOKEN,deadtoken
* set if the node is dead and is being removed by its REMOVAL_COORDINATOR
* REMOVED_TOKEN,deadtoken
@@ -1411,9 +1407,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
case VersionedValue.STATUS_MOVING:
handleStateMoving(endpoint, pieces);
break;
- case VersionedValue.STATUS_RELOCATING:
- handleStateRelocating(endpoint, pieces);
- break;
}
}
else
@@ -1626,33 +1619,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (!isClientMode)
tokensToUpdateInSystemKeyspace.add(token);
}
- else if (tokenMetadata.isRelocating(token) && tokenMetadata.getRelocatingRanges().get(token).equals(endpoint))
- {
- // Token was relocating, this is the bookkeeping that makes it official.
- tokensToUpdateInMetadata.add(token);
- if (!isClientMode)
- tokensToUpdateInSystemKeyspace.add(token);
-
- optionalTasks.schedule(new Runnable()
- {
- public void run()
- {
- logger.info("Removing RELOCATION state for {} {}", endpoint, token);
- getTokenMetadata().removeFromRelocating(token, endpoint);
- }
- }, RING_DELAY, TimeUnit.MILLISECONDS);
-
- // We used to own this token; This token will need to be removed from system.local
- if (currentOwner.equals(FBUtilities.getBroadcastAddress()))
- localTokensToRemove.add(token);
-
- logger.info("Token {} relocated to {}", token, endpoint);
- }
- else if (tokenMetadata.isRelocating(token))
- {
- logger.info("Token {} is relocating to {}, ignoring update from {}",
- token, tokenMetadata.getRelocatingRanges().get(token), endpoint);
- }
else if (Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0)
{
tokensToUpdateInMetadata.add(token);
@@ -1671,8 +1637,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
currentOwner,
token,
endpoint));
- if (logger.isDebugEnabled())
- logger.debug("Relocating ranges: {}", tokenMetadata.printRelocatingRanges());
}
else
{
@@ -1681,8 +1645,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
currentOwner,
token,
endpoint));
- if (logger.isDebugEnabled())
- logger.debug("Relocating ranges: {}", tokenMetadata.printRelocatingRanges());
}
}
@@ -1779,26 +1741,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
/**
- * Handle one or more ranges (tokens) moving from their respective endpoints, to another.
- *
- * @param endpoint the destination of the move
- * @param pieces STATE_RELOCATING,token,token,...
- */
- private void handleStateRelocating(InetAddress endpoint, String[] pieces)
- {
- assert pieces.length >= 2;
-
- List<Token> tokens = new ArrayList<>(pieces.length - 1);
- for (String tStr : Arrays.copyOfRange(pieces, 1, pieces.length))
- tokens.add(getPartitioner().getTokenFactory().fromString(tStr));
-
- logger.debug("Tokens {} are relocating to {}", tokens, endpoint);
- tokenMetadata.addRelocatingTokens(tokens, endpoint);
-
- PendingRangeCalculatorService.instance.update();
- }
-
- /**
* Handle notification that a node being actively removed from the ring via 'removenode'
*
* @param endpoint node
@@ -3390,85 +3332,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
- public void relocate(Collection<String> srcTokens) throws IOException
- {
- List<Token> tokens = new ArrayList<>(srcTokens.size());
- try
- {
- for (String srcT : srcTokens)
- {
- getPartitioner().getTokenFactory().validate(srcT);
- Token token = getPartitioner().getTokenFactory().fromString(srcT);
- if (tokenMetadata.getTokens(tokenMetadata.getEndpoint(token)).size() < 2)
- throw new IOException("Cannot relocate " + srcT + "; source node would have no tokens left");
- tokens.add(getPartitioner().getTokenFactory().fromString(srcT));
- }
- }
- catch (ConfigurationException e)
- {
- throw new IOException(e.getMessage());
- }
- relocateTokens(tokens);
- }
-
- void relocateTokens(Collection<Token> srcTokens)
- {
- assert srcTokens != null;
- InetAddress localAddress = FBUtilities.getBroadcastAddress();
- Collection<Token> localTokens = getTokenMetadata().getTokens(localAddress);
- Set<Token> tokens = new HashSet<>(srcTokens);
-
- Iterator<Token> it = tokens.iterator();
- while (it.hasNext())
- {
- Token srcT = it.next();
- if (localTokens.contains(srcT))
- {
- it.remove();
- logger.warn("cannot move {}; source and destination match", srcT);
- }
- }
-
- if (tokens.size() < 1)
- {
- logger.warn("no valid token arguments specified; nothing to relocate");
- return;
- }
-
- Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.relocating(tokens));
- setMode(Mode.RELOCATING, String.format("relocating %s to %s", tokens, localAddress.getHostAddress()), true);
-
- List<String> keyspaceNames = Schema.instance.getNonSystemKeyspaces();
-
- setMode(Mode.RELOCATING, String.format("Sleeping %s ms before start streaming/fetching ranges", RING_DELAY), true);
- Uninterruptibles.sleepUninterruptibly(RING_DELAY, TimeUnit.MILLISECONDS);
-
- RangeRelocator relocator = new RangeRelocator(tokens, keyspaceNames);
-
- if (relocator.streamsNeeded())
- {
- setMode(Mode.RELOCATING, "fetching new ranges and streaming old ranges", true);
- try
- {
- relocator.stream().get();
- }
- catch (ExecutionException | InterruptedException e)
- {
- throw new RuntimeException("Interrupted latch while waiting for stream/fetch ranges to finish: " + e.getMessage());
- }
- }
- else
- {
- setMode(Mode.RELOCATING, "no new ranges to stream/fetch", true);
- }
-
- Collection<Token> currentTokens = SystemKeyspace.updateLocalTokens(tokens, Collections.<Token>emptyList());
- tokenMetadata.updateNormalTokens(currentTokens, FBUtilities.getBroadcastAddress());
- Gossiper.instance.addLocalApplicationState(ApplicationState.TOKENS, valueFactory.tokens(currentTokens));
- Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.normal(currentTokens));
- setMode(Mode.NORMAL, false);
- }
-
/**
* Get the status of a token removal.
*/
@@ -4088,16 +3951,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return tracingProbability;
}
- public void enableScheduledRangeXfers()
- {
- rangeXferExecutor.setup();
- }
-
- public void disableScheduledRangeXfers()
- {
- rangeXferExecutor.tearDown();
- }
-
public void disableAutoCompaction(String ks, String... columnFamilies) throws IOException
{
for (ColumnFamilyStore cfs : getValidColumnFamilies(true, true, ks, columnFamilies))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc3e0dbd/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 8bd57b8..320f7dc 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -309,11 +309,6 @@ public interface StorageServiceMBean extends NotificationEmitter
public void move(String newToken) throws IOException;
/**
- * @param srcTokens tokens to move to this node
- */
- public void relocate(Collection<String> srcTokens) throws IOException;
-
- /**
* removeToken removes token (and all data associated with
* enpoint that had it) from the ring
*/
@@ -497,11 +492,6 @@ public interface StorageServiceMBean extends NotificationEmitter
*/
public double getTracingProbability();
- /** Begin processing of queued range transfers. */
- public void enableScheduledRangeXfers();
- /** Disable processing of queued range transfers. */
- public void disableScheduledRangeXfers();
-
void disableAutoCompaction(String ks, String ... columnFamilies) throws IOException;
void enableAutoCompaction(String ks, String ... columnFamilies) throws IOException;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc3e0dbd/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index a60ab84..7de5a77 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -512,11 +512,6 @@ public class NodeProbe implements AutoCloseable
ssProxy.move(newToken);
}
- public void takeTokens(List<String> tokens) throws IOException
- {
- ssProxy.relocate(tokens);
- }
-
public void removeNode(String token)
{
ssProxy.removeNode(token);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc3e0dbd/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 3ec0df3..63d48c4 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -139,7 +139,6 @@ public class NodeTool
Drain.class,
TruncateHints.class,
TpStats.class,
- TakeToken.class,
SetLoggingLevel.class,
GetLoggingLevels.class
);
@@ -1409,26 +1408,6 @@ public class NodeTool
}
}
- @Command(name = "taketoken", description = "Move the token(s) from the existing owner(s) to this node. For vnodes only.")
- public static class TakeToken extends NodeToolCmd
- {
- @Arguments(usage = "<token, ...>", description = "Token(s) to take", required = true)
- private List<String> tokens = new ArrayList<String>();
-
- @Override
- public void execute(NodeProbe probe)
- {
- try
- {
- probe.takeTokens(tokens);
- }
- catch (IOException e)
- {
- throw new RuntimeException("Error taking tokens", e);
- }
- }
- }
-
@Command(name = "join", description = "Join the ring")
public static class Join extends NodeToolCmd
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc3e0dbd/src/java/org/apache/cassandra/tools/Shuffle.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/Shuffle.java b/src/java/org/apache/cassandra/tools/Shuffle.java
deleted file mode 100644
index 7cd9d31..0000000
--- a/src/java/org/apache/cassandra/tools/Shuffle.java
+++ /dev/null
@@ -1,722 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.cassandra.tools;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import javax.management.JMX;
-import javax.management.MBeanServerConnection;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.MissingArgumentException;
-
-import org.apache.cassandra.serializers.TimestampSerializer;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
-import org.apache.cassandra.service.StorageServiceMBean;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TFastFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-
-public class Shuffle extends AbstractJmxClient
-{
- private static final String ssObjName = "org.apache.cassandra.db:type=StorageService";
- private static final String epSnitchObjName = "org.apache.cassandra.db:type=EndpointSnitchInfo";
-
- private StorageServiceMBean ssProxy = null;
- private Random rand = new Random(System.currentTimeMillis());
- private final String thriftHost;
- private final int thriftPort;
- private final boolean thriftFramed;
- private final String thriftUsername;
- private final String thriftPassword;
-
- static
- {
- addCmdOption("th", "thrift-host", true, "Thrift hostname or IP address (Default: JMX host)");
- addCmdOption("tp", "thrift-port", true, "Thrift port number (Default: 9160)");
- addCmdOption("tf", "thrift-framed", false, "Enable framed transport for Thrift (Default: false)");
- addCmdOption("tu", "thrift-user", true, "Thrift username");
- addCmdOption("tpw", "thrift-password", true, "Thrift password");
- addCmdOption("en", "and-enable", true, "Immediately enable shuffling (create only)");
- addCmdOption("dc", "only-dc", true, "Apply only to named DC (create only)");
- }
-
- public Shuffle(String host,
- int port,
- String thriftHost,
- int thriftPort,
- boolean thriftFramed,
- String jmxUsername,
- String jmxPassword,
- String thriftUsername,
- String thriftPassword) throws IOException
- {
- super(host, port, jmxUsername, jmxPassword);
-
- this.thriftHost = thriftHost;
- this.thriftPort = thriftPort;
- this.thriftFramed = thriftFramed;
- this.thriftUsername = thriftUsername;
- this.thriftPassword = thriftPassword;
-
- // Setup the StorageService proxy.
- ssProxy = getSSProxy(jmxConn.getMbeanServerConn());
- }
-
- private StorageServiceMBean getSSProxy(MBeanServerConnection mbeanConn)
- {
- StorageServiceMBean proxy;
- try
- {
- ObjectName name = new ObjectName(ssObjName);
- proxy = JMX.newMBeanProxy(mbeanConn, name, StorageServiceMBean.class);
- }
- catch (MalformedObjectNameException e)
- {
- throw new RuntimeException(e);
- }
- return proxy;
- }
-
- private EndpointSnitchInfoMBean getEpSnitchProxy(MBeanServerConnection mbeanConn)
- {
- EndpointSnitchInfoMBean proxy;
- try
- {
- ObjectName name = new ObjectName(epSnitchObjName);
- proxy = JMX.newMBeanProxy(mbeanConn, name, EndpointSnitchInfoMBean.class);
- }
- catch (MalformedObjectNameException e)
- {
- throw new RuntimeException(e);
- }
- return proxy;
- }
-
- /**
- * Given a Multimap of endpoint to tokens, return a new randomized mapping.
- *
- * @param endpointMap current mapping of endpoint to tokens
- * @return a new mapping of endpoint to tokens
- */
- private Multimap<String, String> calculateRelocations(Multimap<String, String> endpointMap)
- {
- Multimap<String, String> relocations = HashMultimap.create();
- Set<String> endpoints = new HashSet<>(endpointMap.keySet());
- Map<String, Integer> endpointToNumTokens = new HashMap<>(endpoints.size());
- Map<String, Iterator<String>> iterMap = new HashMap<>(endpoints.size());
-
- // Create maps of endpoint to token iterators, and endpoint to number of tokens.
- for (String endpoint : endpoints)
- {
- try
- {
- endpointToNumTokens.put(endpoint, ssProxy.getTokens(endpoint).size());
- }
- catch (UnknownHostException e)
- {
- throw new RuntimeException("What that...?", e);
- }
-
- iterMap.put(endpoint, endpointMap.get(endpoint).iterator());
- }
-
- int epsToComplete = endpoints.size();
- Set<String> endpointsCompleted = new HashSet<>();
-
- while (true)
- {
- endpoints.removeAll(endpointsCompleted);
-
- for (String endpoint : endpoints)
- {
- boolean choiceMade = false;
-
- if (!iterMap.get(endpoint).hasNext())
- {
- endpointsCompleted.add(endpoint);
- continue;
- }
-
- String token = iterMap.get(endpoint).next();
-
- List<String> subSet = new ArrayList<>(endpoints);
- subSet.remove(endpoint);
- Collections.shuffle(subSet, rand);
-
- for (String choice : subSet)
- {
- if (relocations.get(choice).size() < endpointToNumTokens.get(choice))
- {
- relocations.put(choice, token);
- choiceMade = true;
- break;
- }
- }
-
- if (!choiceMade)
- relocations.put(endpoint, token);
- }
-
- // We're done when we've exhausted all of the token iterators
- if (endpointsCompleted.size() == epsToComplete)
- break;
- }
-
- return relocations;
- }
-
- /**
- * Enable relocations.
- *
- * @param endpoints Collection of hostname or IP strings
- */
- private void enableRelocations(Collection<String> endpoints)
- {
- for (String endpoint : endpoints)
- {
- try
- {
- JMXConnection conn = new JMXConnection(endpoint, port, username, password);
- getSSProxy(conn.getMbeanServerConn()).enableScheduledRangeXfers();
- conn.close();
- }
- catch (IOException e)
- {
- writeln("Failed to enable shuffling on %s!", endpoint);
- }
- }
- }
-
- /**
- * Disable relocations.
- *
- * @param endpoints Collection of hostname or IP strings
- */
- private void disableRelocations(Collection<String> endpoints)
- {
- for (String endpoint : endpoints)
- {
- try
- {
- JMXConnection conn = new JMXConnection(endpoint, port, username, password);
- getSSProxy(conn.getMbeanServerConn()).disableScheduledRangeXfers();
- conn.close();
- }
- catch (IOException e)
- {
- writeln("Failed to enable shuffling on %s!", endpoint);
- }
- }
- }
-
- /**
- * Return a list of the live nodes (using JMX).
- *
- * @return String endpoint names
- * @throws ShuffleError
- */
- private Collection<String> getLiveNodes() throws ShuffleError
- {
- try
- {
- JMXConnection conn = new JMXConnection(host, port, username, password);
- return getSSProxy(conn.getMbeanServerConn()).getLiveNodes();
- }
- catch (IOException e)
- {
- throw new ShuffleError("Error retrieving list of nodes from JMX interface");
- }
- }
-
- /**
- * Create and distribute a new, randomized token to endpoint mapping.
- *
- * @throws ShuffleError on handled exceptions
- */
- public void shuffle(boolean enable, String onlyDc) throws ShuffleError
- {
- Map<String, String> tokenMap;
- Multimap<String, String> endpointMap = HashMultimap.create();
- EndpointSnitchInfoMBean epSnitchProxy = getEpSnitchProxy(jmxConn.getMbeanServerConn());
-
- try
- {
- CassandraClient seedClient = getThriftClient(thriftHost);
- tokenMap = seedClient.describe_token_map();
-
- for (Map.Entry<String, String> entry : tokenMap.entrySet())
- {
- String endpoint = entry.getValue(), token = entry.getKey();
- try
- {
- if (onlyDc != null)
- {
- if (onlyDc.equals(epSnitchProxy.getDatacenter(endpoint)))
- endpointMap.put(endpoint, token);
- }
- else
- endpointMap.put(endpoint, token);
- }
- catch (UnknownHostException e)
- {
- writeln("Warning: %s unknown to EndpointSnitch!", endpoint);
- }
- }
- }
- catch (InvalidRequestException ire)
- {
- throw new RuntimeException("What that...?", ire);
- }
- catch (TException e)
- {
- throw new ShuffleError(String.format("Thrift request to %s:%d failed: %s", thriftHost, thriftPort, e.getMessage()));
- }
-
- Multimap<String, String> relocations = calculateRelocations(endpointMap);
-
- writeln("%-42s %-15s %-15s", "Token", "From", "To");
- writeln("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~+~~~~~~~~~~~~~~~+~~~~~~~~~~~~~~~");
-
- IPartitioner<?> partitioner = getPartitioner();
-
- // Store relocations on remote nodes.
- for (String endpoint : relocations.keySet())
- {
- for (String tok : relocations.get(endpoint))
- writeln("%-42s %-15s %-15s", tok, tokenMap.get(tok), endpoint);
-
- executeCqlQuery(endpoint, createShuffleBatchInsert(relocations.get(endpoint), partitioner));
- }
-
- if (enable)
- enableRelocations(relocations.keySet());
- }
-
- /**
- * Print a list of pending token relocations for all nodes.
- *
- * @throws ShuffleError
- */
- public void ls() throws ShuffleError
- {
- Map<String, List<CqlRow>> queuedRelocations = listRelocations();
- boolean justOnce = false;
- IPartitioner<?> partitioner = getPartitioner();
-
- for (String host : queuedRelocations.keySet())
- {
- for (CqlRow row : queuedRelocations.get(host))
- {
- assert row.getColumns().size() == 2;
-
- if (!justOnce)
- {
- writeln("%-42s %-15s %s", "Token", "Endpoint", "Requested at");
- writeln("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~+~~~~~~~~~~~~~~~+~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
- justOnce = true;
- }
-
- ByteBuffer tokenBytes = ByteBuffer.wrap(row.getColumns().get(0).getValue());
- ByteBuffer requestedAt = ByteBuffer.wrap(row.getColumns().get(1).getValue());
- Date time = TimestampSerializer.instance.deserialize(requestedAt);
- Token<?> token = partitioner.getTokenFactory().fromByteArray(tokenBytes);
-
- writeln("%-42s %-15s %s", token.toString(), host, time.toString());
- }
- }
- }
-
- /**
- * List pending token relocations for all nodes.
- *
- * @throws ShuffleError
- */
- private Map<String, List<CqlRow>> listRelocations() throws ShuffleError
- {
- String cqlQuery = "SELECT token_bytes,requested_at FROM system.range_xfers";
- Map<String, List<CqlRow>> results = new HashMap<>();
-
- for (String host : getLiveNodes())
- {
- CqlResult result = executeCqlQuery(host, cqlQuery);
- results.put(host, result.getRows());
- }
-
- return results;
- }
-
- /**
- * Clear pending token relocations on all nodes.
- *
- * @throws ShuffleError
- */
- public void clear() throws ShuffleError
- {
- Map<String, List<CqlRow>> queuedRelocations = listRelocations();
-
- for (String host : queuedRelocations.keySet())
- {
- for (CqlRow row : queuedRelocations.get(host))
- {
- assert row.getColumns().size() == 2;
-
- ByteBuffer tokenBytes = ByteBuffer.wrap(row.getColumns().get(0).getValue());
- String query = String.format("DELETE FROM system.range_xfers WHERE token_bytes = 0x%s",
- ByteBufferUtil.bytesToHex(tokenBytes));
- executeCqlQuery(host, query);
- }
- }
- }
-
- /**
- * Enable shuffling on all nodes in the cluster.
- *
- * @throws ShuffleError
- */
- public void enable() throws ShuffleError
- {
- enableRelocations(getLiveNodes());
- }
-
- /**
- * Disable shuffling on all nodes in the cluster.
- *
- * @throws ShuffleError
- */
- public void disable() throws ShuffleError
- {
- disableRelocations(getLiveNodes());
- }
-
- /**
- * Setup and return a new Thrift RPC connection.
- *
- * @param hostName hostname or address to connect to
- * @return a CassandraClient instance
- * @throws ShuffleError
- */
- private CassandraClient getThriftClient(String hostName) throws ShuffleError
- {
- try
- {
- return new CassandraClient(hostName, thriftPort, thriftFramed, thriftUsername, thriftPassword);
- }
- catch (TException e)
- {
- throw new ShuffleError(String.format("Unable to connect to %s/%d: %s", hostName, port, e.getMessage()));
- }
- }
-
- /**
- * Execute a CQL v3 query.
- *
- * @param hostName hostname or address to connect to
- * @param cqlQuery CQL query string
- * @return a Thrift CqlResult instance
- * @throws ShuffleError
- */
- private CqlResult executeCqlQuery(String hostName, String cqlQuery) throws ShuffleError
- {
- try (CassandraClient client = getThriftClient(hostName))
- {
- return client.execute_cql_query(ByteBuffer.wrap(cqlQuery.getBytes()), Compression.NONE);
- }
- catch (UnavailableException e)
- {
- throw new ShuffleError(String.format("Unable to write shuffle entries to %s. Reason: UnavailableException", hostName));
- }
- catch (TimedOutException e)
- {
- throw new ShuffleError(String.format("Unable to write shuffle entries to %s. Reason: TimedOutException", hostName));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Return a partitioner instance for remote host.
- *
- * @return an IPartitioner instance
- * @throws ShuffleError
- */
- private IPartitioner<?> getPartitioner() throws ShuffleError
- {
- String partitionerName;
- try
- {
- partitionerName = getThriftClient(thriftHost).describe_partitioner();
- }
- catch (TException e)
- {
- throw new ShuffleError(String.format("Thrift request to %s:%d failed: %s", thriftHost, port, e.getMessage()));
- }
-
- try
- {
- return (IPartitioner<?>) Class.forName(partitionerName).newInstance();
- }
- catch (ClassNotFoundException e)
- {
- throw new ShuffleError("Unable to locate class for partitioner: " + partitionerName);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Create and return a CQL batch insert statement for a set of token relocations.
- *
- * @param tokens tokens to be relocated
- * @param partitioner an instance of the IPartitioner in use
- * @return a query string
- */
- private String createShuffleBatchInsert(Collection<String> tokens, IPartitioner<?> partitioner)
- {
- StringBuilder query = new StringBuilder();
- query.append("BEGIN BATCH").append("\n");
-
- for (String tokenStr : tokens)
- {
- Token<?> token = partitioner.getTokenFactory().fromString(tokenStr);
- String hexToken = ByteBufferUtil.bytesToHex(partitioner.getTokenFactory().toByteArray(token));
- query.append("INSERT INTO system.range_xfers (token_bytes, requested_at) ")
- .append("VALUES (").append("0x").append(hexToken).append(", 'now');").append("\n");
- }
-
- query.append("APPLY BATCH").append("\n");
- return query.toString();
- }
-
- /** Print usage information. */
- private static void printShuffleHelp()
- {
- StringBuilder sb = new StringBuilder();
- sb.append("Sub-commands:").append(String.format("%n"));
- sb.append(" create Initialize a new shuffle operation").append(String.format("%n"));
- sb.append(" ls List pending relocations").append(String.format("%n"));
- sb.append(" clear Clear pending relocations").append(String.format("%n"));
- sb.append(" en[able] Enable shuffling").append(String.format("%n"));
- sb.append(" dis[able] Disable shuffling").append(String.format("%n%n"));
-
- printHelp("shuffle [options] <sub-command>", sb.toString());
- }
-
- /**
- * Execute.
- *
- * @param args arguments passed on the command line
- * @throws Exception when face meets palm
- */
- public static void main(String[] args) throws Exception
- {
- CommandLine cmd = null;
- try
- {
- cmd = processArguments(args);
- }
- catch (MissingArgumentException e)
- {
- System.err.println(e.getMessage());
- System.exit(1);
- }
-
- // Sub command argument.
- if (cmd.getArgList().size() < 1)
- {
- System.err.println("Missing sub-command argument.");
- printShuffleHelp();
- System.exit(1);
- }
- String subCommand = (String)(cmd.getArgList()).get(0);
-
- String hostName = (cmd.getOptionValue("host") != null) ? cmd.getOptionValue("host") : DEFAULT_HOST;
- String port = (cmd.getOptionValue("port") != null) ? cmd.getOptionValue("port") : Integer.toString(DEFAULT_JMX_PORT);
- String username = cmd.getOptionValue("username");
- String password = cmd.getOptionValue("password");
- String thriftHost = (cmd.getOptionValue("thrift-host") != null) ? cmd.getOptionValue("thrift-host") : hostName;
- String thriftPort = (cmd.getOptionValue("thrift-port") != null) ? cmd.getOptionValue("thrift-port") : "9160";
- String thriftUsername = (cmd.getOptionValue("thrift-user") != null) ? cmd.getOptionValue("thrift-user") : null;
- String thriftPassword = (cmd.getOptionValue("thrift-password") != null) ? cmd.getOptionValue("thrift-password") : null;
- String onlyDc = cmd.getOptionValue("only-dc");
- boolean thriftFramed = cmd.hasOption("thrift-framed");
- boolean andEnable = cmd.hasOption("and-enable");
- int portNum = -1, thriftPortNum = -1;
-
- // Parse JMX port number
- if (port != null)
- {
- try
- {
- portNum = Integer.parseInt(port);
- }
- catch (NumberFormatException ferr)
- {
- System.err.printf("%s is not a valid JMX port number.%n", port);
- System.exit(1);
- }
- }
- else
- {
- portNum = DEFAULT_JMX_PORT;
- }
-
- // Parse Thrift port number
- if (thriftPort != null)
- {
- try
- {
- thriftPortNum = Integer.parseInt(thriftPort);
- }
- catch (NumberFormatException ferr)
- {
- System.err.printf("%s is not a valid port number.%n", thriftPort);
- System.exit(1);
- }
- }
- else
- {
- thriftPortNum = 9160;
- }
-
- Shuffle shuffler = new Shuffle(hostName,
- portNum,
- thriftHost,
- thriftPortNum,
- thriftFramed,
- username,
- password,
- thriftUsername,
- thriftPassword);
-
- try
- {
- if (subCommand.equals("create"))
- shuffler.shuffle(andEnable, onlyDc);
- else if (subCommand.equals("ls"))
- shuffler.ls();
- else if (subCommand.startsWith("en"))
- shuffler.enable();
- else if (subCommand.startsWith("dis"))
- shuffler.disable();
- else if (subCommand.equals("clear"))
- shuffler.clear();
- else
- {
- System.err.println("Unknown subcommand: " + subCommand);
- printShuffleHelp();
- System.exit(1);
- }
- }
- catch (ShuffleError err)
- {
- shuffler.writeln(err);
- System.exit(1);
- }
- finally
- {
- shuffler.close();
- }
-
- System.exit(0);
- }
-
- /** A self-contained Cassandra.Client; Closeable. */
- private static class CassandraClient implements Closeable
- {
- TTransport transport;
- Cassandra.Client client;
-
- CassandraClient(String hostName, int port, boolean framed, String username, String password) throws TException
- {
- TSocket socket = new TSocket(hostName, port);
- transport = (framed) ? socket : new TFastFramedTransport(socket);
- transport.open();
- client = new Cassandra.Client(new TBinaryProtocol(transport));
-
- if (username != null && password != null)
- {
- AuthenticationRequest request = new AuthenticationRequest();
- request.putToCredentials("username", username);
- request.putToCredentials("password", password);
- client.login(request);
- }
-
- client.set_cql_version("3.0.0");
- }
-
- CqlResult execute_cql_query(ByteBuffer cqlQuery, Compression compression) throws Exception
- {
- return client.execute_cql3_query(cqlQuery, compression, ConsistencyLevel.ONE);
- }
-
- String describe_partitioner() throws TException
- {
- return client.describe_partitioner();
- }
-
- Map<String, String> describe_token_map() throws TException
- {
- return client.describe_token_map();
- }
-
- public void close()
- {
- transport.close();
- }
- }
-
- @SuppressWarnings("serial")
- class ShuffleError extends Exception
- {
- ShuffleError(String msg)
- {
- super(msg);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc3e0dbd/test/unit/org/apache/cassandra/service/RelocateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RelocateTest.java b/test/unit/org/apache/cassandra/service/RelocateTest.java
deleted file mode 100644
index e8f66ad..0000000
--- a/test/unit/org/apache/cassandra/service/RelocateTest.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import static org.junit.Assert.*;
-
-import java.math.BigInteger;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.dht.BigIntegerToken;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.RandomPartitioner;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.VersionedValue;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.locator.SimpleSnitch;
-import org.apache.cassandra.locator.TokenMetadata;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.google.common.util.concurrent.Uninterruptibles;
-
-public class RelocateTest
-{
- private static final int TOKENS_PER_NODE = 256;
- private static final int TOKEN_STEP = 10;
- private static final IPartitioner<?> partitioner = new RandomPartitioner();
- private static IPartitioner<?> oldPartitioner;
- private static VersionedValue.VersionedValueFactory vvFactory;
-
- private StorageService ss = StorageService.instance;
- private TokenMetadata tmd = StorageService.instance.getTokenMetadata();
-
- @Before
- public void init()
- {
- tmd.clearUnsafe();
- }
-
- @BeforeClass
- public static void setUp() throws Exception
- {
- oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner);
- SchemaLoader.loadSchema();
- vvFactory = new VersionedValue.VersionedValueFactory(partitioner);
- }
-
- @AfterClass
- public static void tearDown() throws Exception
- {
- StorageService.instance.setPartitionerUnsafe(oldPartitioner);
- SchemaLoader.stopGossiper();
- }
-
- /** Setup a virtual node ring */
- private static Map<Token<?>, InetAddress> createInitialRing(int size) throws UnknownHostException
- {
- Map<Token<?>, InetAddress> tokenMap = new HashMap<Token<?>, InetAddress>();
- int currentToken = TOKEN_STEP;
-
- for(int i = 0; i < size; i++)
- {
- InetAddress endpoint = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
- Gossiper.instance.initializeNodeUnsafe(endpoint, UUID.randomUUID(), 1);
- List<Token> tokens = new ArrayList<Token>();
-
- for (int j = 0; j < TOKENS_PER_NODE; j++)
- {
- Token token = new BigIntegerToken(String.valueOf(currentToken));
- tokenMap.put(token, endpoint);
- tokens.add(token);
- currentToken += TOKEN_STEP;
- }
-
- Gossiper.instance.injectApplicationState(endpoint, ApplicationState.TOKENS, vvFactory.tokens(tokens));
- StorageService.instance.onChange(endpoint, ApplicationState.STATUS, vvFactory.normal(tokens));
- }
-
- return tokenMap;
- }
-
- // Copy-paste from MoveTest.java
- private AbstractReplicationStrategy getStrategy(String keyspaceName, TokenMetadata tmd)
- {
- KSMetaData ksmd = Schema.instance.getKSMetaData(keyspaceName);
- return AbstractReplicationStrategy.createReplicationStrategy(
- keyspaceName,
- ksmd.strategyClass,
- tmd,
- new SimpleSnitch(),
- ksmd.strategyOptions);
- }
-
- /** Ensure proper write endpoints during relocation */
- @Test
- public void testWriteEndpointsDuringRelocate() throws Exception
- {
- Map<Token<?>, InetAddress> tokenMap = createInitialRing(5);
- Map<Token, List<InetAddress>> expectedEndpoints = new HashMap<Token, List<InetAddress>>();
-
-
- for (Token<?> token : tokenMap.keySet())
- {
- BigIntegerToken keyToken = new BigIntegerToken(((BigInteger)token.token).add(new BigInteger("5")));
- List<InetAddress> endpoints = new ArrayList<InetAddress>();
- Iterator<Token> tokenIter = TokenMetadata.ringIterator(tmd.sortedTokens(), keyToken, false);
- while (tokenIter.hasNext())
- {
- InetAddress ep = tmd.getEndpoint(tokenIter.next());
- if (!endpoints.contains(ep))
- endpoints.add(ep);
- }
- expectedEndpoints.put(keyToken, endpoints);
- }
-
- // Relocate the first token from the first endpoint, to the second endpoint.
- Token relocateToken = new BigIntegerToken(String.valueOf(TOKEN_STEP));
- ss.onChange(
- InetAddress.getByName("127.0.0.2"),
- ApplicationState.STATUS,
- vvFactory.relocating(Collections.singleton(relocateToken)));
- assertTrue(tmd.isRelocating(relocateToken));
-
- AbstractReplicationStrategy strategy;
- for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
- {
- strategy = getStrategy(keyspaceName, tmd);
- for (Token token : tokenMap.keySet())
- {
- BigIntegerToken keyToken = new BigIntegerToken(((BigInteger)token.token).add(new BigInteger("5")));
-
- HashSet<InetAddress> actual = new HashSet<InetAddress>(tmd.getWriteEndpoints(keyToken, keyspaceName, strategy.calculateNaturalEndpoints(keyToken, tmd.cloneOnlyTokenMap())));
- HashSet<InetAddress> expected = new HashSet<InetAddress>();
-
- for (int i = 0; i < actual.size(); i++)
- expected.add(expectedEndpoints.get(keyToken).get(i));
-
- assertEquals("mismatched endpoint sets", expected, actual);
- }
- }
- }
-
- /** Use STATUS changes to trigger membership update and validate results. */
- @Test
- public void testRelocationSuccess() throws UnknownHostException
- {
- createInitialRing(5);
-
- // Node handling the relocation (dst), and the token being relocated (src).
- InetAddress relocator = InetAddress.getByName("127.0.0.3");
- Token relocatee = new BigIntegerToken(String.valueOf(TOKEN_STEP));
-
- // Send RELOCATING and ensure token status
- ss.onChange(relocator, ApplicationState.STATUS, vvFactory.relocating(Collections.singleton(relocatee)));
- assertTrue(tmd.isRelocating(relocatee));
-
- // Create a list of the endpoint's existing tokens, and add the relocatee to it.
- List<Token> tokens = new ArrayList<Token>(tmd.getTokens(relocator));
- SystemKeyspace.updateTokens(tokens);
- tokens.add(relocatee);
-
- // Send a normal status, then ensure all is copesetic.
- Gossiper.instance.injectApplicationState(relocator, ApplicationState.TOKENS, vvFactory.tokens(tokens));
- ss.onChange(relocator, ApplicationState.STATUS, vvFactory.normal(tokens));
-
- // Relocating entries are removed after RING_DELAY
- Uninterruptibles.sleepUninterruptibly(StorageService.RING_DELAY + 10, TimeUnit.MILLISECONDS);
-
- assertTrue(!tmd.isRelocating(relocatee));
- assertEquals(tmd.getEndpoint(relocatee), relocator);
- }
-}