You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/07/31 05:08:54 UTC
git commit: fix merge
Repository: cassandra
Updated Branches:
refs/heads/trunk fafcfc787 -> da3fd5d7a
fix merge
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/da3fd5d7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/da3fd5d7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/da3fd5d7
Branch: refs/heads/trunk
Commit: da3fd5d7ac2bb0f64a32176c3722b857b71bc654
Parents: fafcfc7
Author: Brandon Williams <br...@apache.org>
Authored: Wed Jul 30 22:07:10 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Jul 30 22:08:40 2014 -0500
----------------------------------------------------------------------
.../apache/cassandra/locator/TokenMetadata.java | 21 +-
.../service/PendingRangeCalculatorService.java | 73 +------
.../ScheduledRangeTransferExecutorService.java | 135 ------------
.../apache/cassandra/service/RelocateTest.java | 204 -------------------
4 files changed, 4 insertions(+), 429 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3fd5d7/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index bb5455c..2a6a624 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -701,10 +701,10 @@ public class TokenMetadata
{
Multimap<Range<Token>, InetAddress> newPendingRanges = HashMultimap.create();
- if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty() && relocatingTokens.isEmpty())
+ if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty())
{
if (logger.isDebugEnabled())
- logger.debug("No bootstrapping, leaving or moving nodes, and no relocating tokens -> empty pending ranges for {}", keyspaceName);
+ logger.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName);
pendingRanges.put(keyspaceName, newPendingRanges);
return;
@@ -746,7 +746,7 @@ public class TokenMetadata
}
// At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
- // We can now finish the calculation by checking moving and relocating nodes.
+ // We can now finish the calculation by checking moving nodes.
// For each of the moving nodes, we do the same thing we did for bootstrapping:
// simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
@@ -765,20 +765,6 @@ public class TokenMetadata
allLeftMetadata.removeEndpoint(endpoint);
}
- // Ranges being relocated.
- for (Map.Entry<Token, InetAddress> relocating : relocatingTokens.entrySet())
- {
- InetAddress endpoint = relocating.getValue(); // address of the moving node
- Token token = relocating.getKey();
-
- allLeftMetadata.updateNormalToken(token, endpoint);
-
- for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
- newPendingRanges.put(range, endpoint);
-
- allLeftMetadata.removeEndpoint(endpoint);
- }
-
pendingRanges.put(keyspaceName, newPendingRanges);
if (logger.isDebugEnabled())
@@ -936,7 +922,6 @@ public class TokenMetadata
leavingEndpoints.clear();
pendingRanges.clear();
movingEndpoints.clear();
- relocatingTokens.clear();
sortedTokens.clear();
topology.clear();
invalidateCachedRings();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3fd5d7/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index d3aa6b6..2276c4a 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -96,77 +96,6 @@ public class PendingRangeCalculatorService
// public & static for testing purposes
public static void calculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName)
{
- TokenMetadata tm = StorageService.instance.getTokenMetadata();
- Multimap<Range<Token>, InetAddress> pendingRanges = HashMultimap.create();
- BiMultiValMap<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens();
- Set<InetAddress> leavingEndpoints = tm.getLeavingEndpoints();
-
- if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && tm.getMovingEndpoints().isEmpty())
- {
- if (logger.isDebugEnabled())
- logger.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName);
- tm.setPendingRanges(keyspaceName, pendingRanges);
- return;
- }
-
- Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges();
-
- // Copy of metadata reflecting the situation after all leave operations are finished.
- TokenMetadata allLeftMetadata = tm.cloneAfterAllLeft();
-
- // get all ranges that will be affected by leaving nodes
- Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
- for (InetAddress endpoint : leavingEndpoints)
- affectedRanges.addAll(addressRanges.get(endpoint));
-
- // for each of those ranges, find what new nodes will be responsible for the range when
- // all leaving nodes are gone.
- for (Range<Token> range : affectedRanges)
- {
- Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, tm.cloneOnlyTokenMap()));
- Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
- pendingRanges.putAll(range, Sets.difference(newEndpoints, currentEndpoints));
- }
-
- // At this stage pendingRanges has been updated according to leave operations. We can
- // now continue the calculation by checking bootstrapping nodes.
-
- // For each of the bootstrapping nodes, simply add and remove them one by one to
- // allLeftMetadata and check in between what their ranges would be.
- Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse();
- for (InetAddress endpoint : bootstrapAddresses.keySet())
- {
- Collection<Token> tokens = bootstrapAddresses.get(endpoint);
-
- allLeftMetadata.updateNormalTokens(tokens, endpoint);
- for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
- pendingRanges.put(range, endpoint);
- allLeftMetadata.removeEndpoint(endpoint);
- }
-
- // At this stage pendingRanges has been updated according to leaving and bootstrapping nodes.
- // We can now finish the calculation by checking moving and relocating nodes.
-
- // For each of the moving nodes, we do the same thing we did for bootstrapping:
- // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
- for (Pair<Token, InetAddress> moving : tm.getMovingEndpoints())
- {
- InetAddress endpoint = moving.right; // address of the moving node
-
- // moving.left is a new token of the endpoint
- allLeftMetadata.updateNormalToken(moving.left, endpoint);
-
- for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
- {
- pendingRanges.put(range, endpoint);
- }
-
- allLeftMetadata.removeEndpoint(endpoint);
- }
-
- tm.setPendingRanges(keyspaceName, pendingRanges);
-
- if (logger.isDebugEnabled())
- logger.debug("Pending ranges:\n" + (pendingRanges.isEmpty() ? "<empty>" : tm.printPendingRanges()));
+ StorageService.instance.getTokenMetadata().calculatePendingRanges(strategy, keyspaceName);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3fd5d7/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java b/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
deleted file mode 100644
index b8117b9..0000000
--- a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Date;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.dht.Token;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ScheduledRangeTransferExecutorService
-{
- private static final Logger LOG = LoggerFactory.getLogger(ScheduledRangeTransferExecutorService.class);
- private static final int INTERVAL = 10;
- private ScheduledExecutorService scheduler;
-
- public void setup()
- {
- if (DatabaseDescriptor.getNumTokens() == 1)
- {
- LOG.warn("Cannot start range transfer scheduler: endpoint is not virtual nodes-enabled");
- return;
- }
-
- scheduler = Executors.newSingleThreadScheduledExecutor(new RangeTransferThreadFactory());
- scheduler.scheduleWithFixedDelay(new RangeTransfer(), 0, INTERVAL, TimeUnit.SECONDS);
- LOG.info("Enabling scheduled transfers of token ranges");
- }
-
- public void tearDown()
- {
- if (scheduler == null)
- {
- LOG.warn("Unable to shutdown; Scheduler never enabled");
- return;
- }
-
- LOG.info("Shutting down range transfer scheduler");
- scheduler.shutdownNow();
- }
-}
-
-class RangeTransfer implements Runnable
-{
- private static final Logger LOG = LoggerFactory.getLogger(RangeTransfer.class);
-
- public void run()
- {
- UntypedResultSet res = executeInternal("SELECT * FROM system." + SystemKeyspace.RANGE_XFERS_CF);
-
- if (res.size() < 1)
- {
- LOG.info("No queued ranges to transfer, shuffle complete. Run 'cassandra-shuffle disable' to stop this message.");
- return;
- }
-
- if (!isReady())
- return;
-
- UntypedResultSet.Row row = res.iterator().next();
-
- Date requestedAt = row.getTimestamp("requested_at");
- ByteBuffer tokenBytes = row.getBytes("token_bytes");
- Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(tokenBytes);
-
- LOG.info("Initiating transfer of {} (scheduled at {})", token, requestedAt.toString());
- try
- {
- StorageService.instance.relocateTokens(Collections.singleton(token));
- }
- catch (Exception e)
- {
- LOG.error("Error removing {}: {}", token, e);
- }
- finally
- {
- LOG.debug("Removing queued entry for transfer of {}", token);
- executeInternal(String.format("DELETE FROM system.%s WHERE token_bytes = ?", SystemKeyspace.RANGE_XFERS_CF), tokenBytes);
- }
- }
-
- private boolean isReady()
- {
- int targetTokens = DatabaseDescriptor.getNumTokens();
- int highMark = (int)Math.ceil(targetTokens + (targetTokens * .10));
- int actualTokens = StorageService.instance.getTokens().size();
-
- if (actualTokens >= highMark)
- {
- LOG.warn("Pausing until token count stabilizes (target={}, actual={})", targetTokens, actualTokens);
- return false;
- }
-
- return true;
- }
-}
-
-class RangeTransferThreadFactory implements ThreadFactory
-{
- private AtomicInteger count = new AtomicInteger(0);
-
- public Thread newThread(Runnable r)
- {
- Thread rangeXferThread = new Thread(r);
- rangeXferThread.setName(String.format("ScheduledRangeXfers:%d", count.getAndIncrement()));
- return rangeXferThread;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3fd5d7/test/unit/org/apache/cassandra/service/RelocateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RelocateTest.java b/test/unit/org/apache/cassandra/service/RelocateTest.java
deleted file mode 100644
index 22a992c..0000000
--- a/test/unit/org/apache/cassandra/service/RelocateTest.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import static org.junit.Assert.*;
-
-import java.math.BigInteger;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.dht.BigIntegerToken;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.RandomPartitioner;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.VersionedValue;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.locator.SimpleSnitch;
-import org.apache.cassandra.locator.TokenMetadata;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.google.common.util.concurrent.Uninterruptibles;
-
-public class RelocateTest
-{
- private static final int TOKENS_PER_NODE = 256;
- private static final int TOKEN_STEP = 10;
- private static final IPartitioner<?> partitioner = new RandomPartitioner();
- private static IPartitioner<?> oldPartitioner;
- private static VersionedValue.VersionedValueFactory vvFactory;
-
- private StorageService ss = StorageService.instance;
- private TokenMetadata tmd = StorageService.instance.getTokenMetadata();
-
- @Before
- public void init()
- {
- tmd.clearUnsafe();
- }
-
- @BeforeClass
- public static void setUp() throws Exception
- {
- oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner);
- SchemaLoader.loadSchema();
- vvFactory = new VersionedValue.VersionedValueFactory(partitioner);
- }
-
- @AfterClass
- public static void tearDown() throws Exception
- {
- StorageService.instance.setPartitionerUnsafe(oldPartitioner);
- }
-
- /** Setup a virtual node ring */
- private static Map<Token<?>, InetAddress> createInitialRing(int size) throws UnknownHostException
- {
- Map<Token<?>, InetAddress> tokenMap = new HashMap<Token<?>, InetAddress>();
- int currentToken = TOKEN_STEP;
-
- for(int i = 0; i < size; i++)
- {
- InetAddress endpoint = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
- Gossiper.instance.initializeNodeUnsafe(endpoint, UUID.randomUUID(), 1);
- List<Token> tokens = new ArrayList<Token>();
-
- for (int j = 0; j < TOKENS_PER_NODE; j++)
- {
- Token token = new BigIntegerToken(String.valueOf(currentToken));
- tokenMap.put(token, endpoint);
- tokens.add(token);
- currentToken += TOKEN_STEP;
- }
-
- Gossiper.instance.injectApplicationState(endpoint, ApplicationState.TOKENS, vvFactory.tokens(tokens));
- StorageService.instance.onChange(endpoint, ApplicationState.STATUS, vvFactory.normal(tokens));
- }
-
- return tokenMap;
- }
-
- // Copy-paste from MoveTest.java
- private AbstractReplicationStrategy getStrategy(String keyspaceName, TokenMetadata tmd)
- {
- KSMetaData ksmd = Schema.instance.getKSMetaData(keyspaceName);
- return AbstractReplicationStrategy.createReplicationStrategy(
- keyspaceName,
- ksmd.strategyClass,
- tmd,
- new SimpleSnitch(),
- ksmd.strategyOptions);
- }
-
- /** Ensure proper write endpoints during relocation */
- @Test
- public void testWriteEndpointsDuringRelocate() throws Exception
- {
- Map<Token<?>, InetAddress> tokenMap = createInitialRing(5);
- Map<Token, List<InetAddress>> expectedEndpoints = new HashMap<Token, List<InetAddress>>();
-
-
- for (Token<?> token : tokenMap.keySet())
- {
- BigIntegerToken keyToken = new BigIntegerToken(((BigInteger)token.token).add(new BigInteger("5")));
- List<InetAddress> endpoints = new ArrayList<InetAddress>();
- Iterator<Token> tokenIter = TokenMetadata.ringIterator(tmd.sortedTokens(), keyToken, false);
- while (tokenIter.hasNext())
- {
- InetAddress ep = tmd.getEndpoint(tokenIter.next());
- if (!endpoints.contains(ep))
- endpoints.add(ep);
- }
- expectedEndpoints.put(keyToken, endpoints);
- }
-
- // Relocate the first token from the first endpoint, to the second endpoint.
- Token relocateToken = new BigIntegerToken(String.valueOf(TOKEN_STEP));
- ss.onChange(
- InetAddress.getByName("127.0.0.2"),
- ApplicationState.STATUS,
- vvFactory.relocating(Collections.singleton(relocateToken)));
- assertTrue(tmd.isRelocating(relocateToken));
-
- AbstractReplicationStrategy strategy;
- for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
- {
- strategy = getStrategy(keyspaceName, tmd);
- for (Token token : tokenMap.keySet())
- {
- BigIntegerToken keyToken = new BigIntegerToken(((BigInteger)token.token).add(new BigInteger("5")));
-
- HashSet<InetAddress> actual = new HashSet<InetAddress>(tmd.getWriteEndpoints(keyToken, keyspaceName, strategy.calculateNaturalEndpoints(keyToken, tmd.cloneOnlyTokenMap())));
- HashSet<InetAddress> expected = new HashSet<InetAddress>();
-
- for (int i = 0; i < actual.size(); i++)
- expected.add(expectedEndpoints.get(keyToken).get(i));
-
- assertEquals("mismatched endpoint sets", expected, actual);
- }
- }
- }
-
- /** Use STATUS changes to trigger membership update and validate results. */
- @Test
- public void testRelocationSuccess() throws UnknownHostException
- {
- createInitialRing(5);
-
- // Node handling the relocation (dst), and the token being relocated (src).
- InetAddress relocator = InetAddress.getByName("127.0.0.3");
- Token relocatee = new BigIntegerToken(String.valueOf(TOKEN_STEP));
-
- // Send RELOCATING and ensure token status
- ss.onChange(relocator, ApplicationState.STATUS, vvFactory.relocating(Collections.singleton(relocatee)));
- assertTrue(tmd.isRelocating(relocatee));
-
- // Create a list of the endpoint's existing tokens, and add the relocatee to it.
- List<Token> tokens = new ArrayList<Token>(tmd.getTokens(relocator));
- SystemKeyspace.updateTokens(tokens);
- tokens.add(relocatee);
-
- // Send a normal status, then ensure all is copesetic.
- Gossiper.instance.injectApplicationState(relocator, ApplicationState.TOKENS, vvFactory.tokens(tokens));
- ss.onChange(relocator, ApplicationState.STATUS, vvFactory.normal(tokens));
-
- // Relocating entries are removed after RING_DELAY
- Uninterruptibles.sleepUninterruptibly(StorageService.RING_DELAY + 10, TimeUnit.MILLISECONDS);
-
- assertTrue(!tmd.isRelocating(relocatee));
- assertEquals(tmd.getEndpoint(relocatee), relocator);
- }
-}