You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/03/26 16:53:30 UTC
[1/3] git commit: cleanup
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 45a6373e4 -> 76101027e
refs/heads/trunk 228041629 -> ebe592173
cleanup
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/76101027
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/76101027
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/76101027
Branch: refs/heads/cassandra-2.1
Commit: 76101027eef2fa097b2b55cdc037a3bfbe235753
Parents: 45a6373
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Mar 26 10:50:41 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Mar 26 10:50:41 2014 -0500
----------------------------------------------------------------------
.../cassandra/service/StorageService.java | 249 ++++++++++---------
1 file changed, 130 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/76101027/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 17bd514..042e2bc 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -183,9 +183,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private static final AtomicInteger nextRepairCommand = new AtomicInteger();
- private static ScheduledRangeTransferExecutorService rangeXferExecutor = new ScheduledRangeTransferExecutorService();
+ private static final ScheduledRangeTransferExecutorService rangeXferExecutor = new ScheduledRangeTransferExecutorService();
- private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<IEndpointLifecycleSubscriber>();
+ private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<>();
private static final BackgroundActivityMonitor bgMonitor = new BackgroundActivityMonitor();
@@ -564,7 +564,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
Thread drainOnShutdown = new Thread(new WrappedRunnable()
{
@Override
- public void runMayThrow() throws ExecutionException, InterruptedException, IOException
+ public void runMayThrow() throws InterruptedException
{
ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
@@ -584,7 +584,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
StorageProxy.instance.verifyNoHintsInProgress();
- List<Future<?>> flushes = new ArrayList<Future<?>>();
+ List<Future<?>> flushes = new ArrayList<>();
for (Keyspace keyspace : Keyspace.all())
{
KSMetaData ksm = Schema.instance.getKSMetaData(keyspace.getName());
@@ -634,7 +634,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
joined = true;
Collection<Token> tokens = null;
- Map<ApplicationState, VersionedValue> appStates = new HashMap<ApplicationState, VersionedValue>();
+ Map<ApplicationState, VersionedValue> appStates = new HashMap<>();
if (DatabaseDescriptor.getReplaceTokens().size() > 0 || DatabaseDescriptor.getReplaceNode() != null)
throw new RuntimeException("Replace method removed; use cassandra.replace_address instead");
@@ -684,7 +684,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
//
// We attempted to replace this with a schema-presence check, but you need a meaningful sleep
// to get schema info from gossip which defeats the purpose. See CASSANDRA-4427 for the gory details.
- Set<InetAddress> current = new HashSet<InetAddress>();
+ Set<InetAddress> current = new HashSet<>();
logger.debug("Bootstrap variables: {} {} {} {}",
DatabaseDescriptor.isAutoBootstrap(),
SystemKeyspace.bootstrapInProgress(),
@@ -800,7 +800,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
else
{
- tokens = new ArrayList<Token>(initialTokens.size());
+ tokens = new ArrayList<>(initialTokens.size());
for (String token : initialTokens)
tokens.add(getPartitioner().getTokenFactory().fromString(token));
logger.info("Saved tokens not found. Using configuration value: {}", tokens);
@@ -1054,7 +1054,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace)
{
/* All the ranges for the tokens */
- Map<List<String>, List<String>> map = new HashMap<List<String>, List<String>>();
+ Map<List<String>, List<String>> map = new HashMap<>();
for (Map.Entry<Range<Token>,List<InetAddress>> entry : getRangeToAddressMap(keyspace).entrySet())
{
map.put(entry.getKey().asList(), stringify(entry.getValue()));
@@ -1085,10 +1085,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public Map<List<String>, List<String>> getRangeToRpcaddressMap(String keyspace)
{
/* All the ranges for the tokens */
- Map<List<String>, List<String>> map = new HashMap<List<String>, List<String>>();
+ Map<List<String>, List<String>> map = new HashMap<>();
for (Map.Entry<Range<Token>, List<InetAddress>> entry : getRangeToAddressMap(keyspace).entrySet())
{
- List<String> rpcaddrs = new ArrayList<String>(entry.getValue().size());
+ List<String> rpcaddrs = new ArrayList<>(entry.getValue().size());
for (InetAddress endpoint: entry.getValue())
{
rpcaddrs.add(getRpcaddress(endpoint));
@@ -1105,10 +1105,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (keyspace == null)
keyspace = Schema.instance.getNonSystemKeyspaces().get(0);
- Map<List<String>, List<String>> map = new HashMap<List<String>, List<String>>();
+ Map<List<String>, List<String>> map = new HashMap<>();
for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : tokenMetadata.getPendingRanges(keyspace).entrySet())
{
- List<InetAddress> l = new ArrayList<InetAddress>(entry.getValue());
+ List<InetAddress> l = new ArrayList<>(entry.getValue());
map.put(entry.getKey().asList(), stringify(l));
}
return map;
@@ -1189,7 +1189,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
throw new IOException(e.getMessage());
}
- List<String> result = new ArrayList<String>(tokenRanges.size());
+ List<String> result = new ArrayList<>(tokenRanges.size());
for (TokenRange tokenRange : tokenRanges)
result.add(tokenRange.toString());
@@ -1227,7 +1227,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (keyspace == null || Keyspace.open(keyspace).getReplicationStrategy() instanceof LocalStrategy)
throw new InvalidRequestException("There is no ring for the keyspace: " + keyspace);
- List<TokenRange> ranges = new ArrayList<TokenRange>();
+ List<TokenRange> ranges = new ArrayList<>();
Token.TokenFactory tf = getPartitioner().getTokenFactory();
Map<Range<Token>, List<InetAddress>> rangeToAddressMap =
@@ -1239,9 +1239,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
Range range = entry.getKey();
List<InetAddress> addresses = entry.getValue();
- List<String> endpoints = new ArrayList<String>(addresses.size());
- List<String> rpc_endpoints = new ArrayList<String>(addresses.size());
- List<EndpointDetails> epDetails = new ArrayList<EndpointDetails>(addresses.size());
+ List<String> endpoints = new ArrayList<>(addresses.size());
+ List<String> rpc_endpoints = new ArrayList<>(addresses.size());
+ List<EndpointDetails> epDetails = new ArrayList<>(addresses.size());
for (InetAddress endpoint : addresses)
{
@@ -1270,8 +1270,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
Map<Token, InetAddress> mapInetAddress = tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap();
// in order to preserve tokens in ascending order, we use LinkedHashMap here
- Map<String, String> mapString = new LinkedHashMap<String, String>(mapInetAddress.size());
- List<Token> tokens = new ArrayList<Token>(mapInetAddress.keySet());
+ Map<String, String> mapString = new LinkedHashMap<>(mapInetAddress.size());
+ List<Token> tokens = new ArrayList<>(mapInetAddress.keySet());
Collections.sort(tokens);
for (Token token : tokens)
{
@@ -1287,7 +1287,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public Map<String, String> getHostIdMap()
{
- Map<String, String> mapOut = new HashMap<String, String>();
+ Map<String, String> mapOut = new HashMap<>();
for (Map.Entry<InetAddress, UUID> entry : getTokenMetadata().getEndpointToHostIdMapForReading().entrySet())
mapOut.put(entry.getKey().getHostAddress(), entry.getValue().toString());
return mapOut;
@@ -1301,7 +1301,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*/
private Map<Range<Token>, List<InetAddress>> constructRangeToEndpointMap(String keyspace, List<Range<Token>> ranges)
{
- Map<Range<Token>, List<InetAddress>> rangeToEndpointMap = new HashMap<Range<Token>, List<InetAddress>>();
+ Map<Range<Token>, List<InetAddress>> rangeToEndpointMap = new HashMap<>();
for (Range<Token> range : ranges)
{
rangeToEndpointMap.put(range, Keyspace.open(keyspace).getReplicationStrategy().getNaturalEndpoints(range.right));
@@ -1358,20 +1358,31 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
String moveName = pieces[0];
- if (moveName.equals(VersionedValue.STATUS_BOOTSTRAPPING))
- handleStateBootstrap(endpoint);
- else if (moveName.equals(VersionedValue.STATUS_NORMAL))
- handleStateNormal(endpoint);
- else if (moveName.equals(VersionedValue.REMOVING_TOKEN) || moveName.equals(VersionedValue.REMOVED_TOKEN))
- handleStateRemoving(endpoint, pieces);
- else if (moveName.equals(VersionedValue.STATUS_LEAVING))
- handleStateLeaving(endpoint);
- else if (moveName.equals(VersionedValue.STATUS_LEFT))
- handleStateLeft(endpoint, pieces);
- else if (moveName.equals(VersionedValue.STATUS_MOVING))
- handleStateMoving(endpoint, pieces);
- else if (moveName.equals(VersionedValue.STATUS_RELOCATING))
- handleStateRelocating(endpoint, pieces);
+ switch (moveName)
+ {
+ case VersionedValue.STATUS_BOOTSTRAPPING:
+ handleStateBootstrap(endpoint);
+ break;
+ case VersionedValue.STATUS_NORMAL:
+ handleStateNormal(endpoint);
+ break;
+ case VersionedValue.REMOVING_TOKEN:
+ case VersionedValue.REMOVED_TOKEN:
+ handleStateRemoving(endpoint, pieces);
+ break;
+ case VersionedValue.STATUS_LEAVING:
+ handleStateLeaving(endpoint);
+ break;
+ case VersionedValue.STATUS_LEFT:
+ handleStateLeft(endpoint, pieces);
+ break;
+ case VersionedValue.STATUS_MOVING:
+ handleStateMoving(endpoint, pieces);
+ break;
+ case VersionedValue.STATUS_RELOCATING:
+ handleStateRelocating(endpoint, pieces);
+ break;
+ }
}
else
{
@@ -1477,10 +1488,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
tokens = getTokensFor(endpoint);
- Set<Token> tokensToUpdateInMetadata = new HashSet<Token>();
- Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<Token>();
- Set<Token> localTokensToRemove = new HashSet<Token>();
- Set<InetAddress> endpointsToRemove = new HashSet<InetAddress>();
+ Set<Token> tokensToUpdateInMetadata = new HashSet<>();
+ Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>();
+ Set<Token> localTokensToRemove = new HashSet<>();
+ Set<InetAddress> endpointsToRemove = new HashSet<>();
if (logger.isDebugEnabled())
@@ -1704,7 +1715,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
assert pieces.length >= 2;
- List<Token> tokens = new ArrayList<Token>(pieces.length - 1);
+ List<Token> tokens = new ArrayList<>(pieces.length - 1);
for (String tStr : Arrays.copyOfRange(pieces, 1, pieces.length))
tokens.add(getPartitioner().getTokenFactory().fromString(tStr));
@@ -1889,12 +1900,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> rangesToFetch = HashMultimap.create();
- final InetAddress myAddress = FBUtilities.getBroadcastAddress();
+ InetAddress myAddress = FBUtilities.getBroadcastAddress();
for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
{
Multimap<Range<Token>, InetAddress> changedRanges = getChangedRangesForLeaving(keyspaceName, endpoint);
- Set<Range<Token>> myNewRanges = new HashSet<Range<Token>>();
+ Set<Range<Token>> myNewRanges = new HashSet<>();
for (Map.Entry<Range<Token>, InetAddress> entry : changedRanges.entries())
{
if (entry.getValue().equals(myAddress))
@@ -1908,11 +1919,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
StreamPlan stream = new StreamPlan("Restore replica count");
- for (final String keyspaceName : rangesToFetch.keySet())
+ for (String keyspaceName : rangesToFetch.keySet())
{
for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : rangesToFetch.get(keyspaceName))
{
- final InetAddress source = entry.getKey();
+ InetAddress source = entry.getKey();
Collection<Range<Token>> ranges = entry.getValue();
if (logger.isDebugEnabled())
logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges, ", "));
@@ -1945,7 +1956,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (logger.isDebugEnabled())
logger.debug("Node {} ranges [{}]", endpoint, StringUtils.join(ranges, ", "));
- Map<Range<Token>, List<InetAddress>> currentReplicaEndpoints = new HashMap<Range<Token>, List<InetAddress>>();
+ Map<Range<Token>, List<InetAddress>> currentReplicaEndpoints = new HashMap<>();
// Find (for each range) all nodes that store replicas for these ranges as well
for (Range<Token> range : ranges)
@@ -2051,7 +2062,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public Map<String, String> getLoadMap()
{
- Map<String, String> map = new HashMap<String, String>();
+ Map<String, String> map = new HashMap<>();
for (Map.Entry<InetAddress,Double> entry : LoadBroadcaster.instance.getLoadInfo().entrySet())
{
map.put(entry.getKey().getHostAddress(), FileUtils.stringifyFileSize(entry.getValue()));
@@ -2087,7 +2098,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private List<String> getTokens(InetAddress endpoint)
{
- List<String> strTokens = new ArrayList<String>();
+ List<String> strTokens = new ArrayList<>();
for (Token tok : getTokenMetadata().getTokens(endpoint))
strTokens.add(tok.toString());
return strTokens;
@@ -2110,7 +2121,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public List<String> getMovingNodes()
{
- List<String> endpoints = new ArrayList<String>();
+ List<String> endpoints = new ArrayList<>();
for (Pair<Token, InetAddress> node : tokenMetadata.getMovingEndpoints())
{
@@ -2155,7 +2166,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private List<String> stringify(Iterable<InetAddress> endpoints)
{
- List<String> stringEndpoints = new ArrayList<String>();
+ List<String> stringEndpoints = new ArrayList<>();
for (InetAddress ep : endpoints)
{
stringEndpoints.add(ep.getHostAddress());
@@ -2235,7 +2246,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
else
{
- ArrayList<Keyspace> t = new ArrayList<Keyspace>(keyspaceNames.length);
+ ArrayList<Keyspace> t = new ArrayList<>(keyspaceNames.length);
for (String keyspaceName : keyspaceNames)
t.add(getValidKeyspace(keyspaceName));
keyspaces = t;
@@ -2319,15 +2330,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public Map<String, TabularData> getSnapshotDetails()
{
- final Map<String, TabularData> snapshotMap = new HashMap<>();
- for (final Keyspace keyspace : Keyspace.all())
+ Map<String, TabularData> snapshotMap = new HashMap<>();
+ for (Keyspace keyspace : Keyspace.all())
{
if (Keyspace.SYSTEM_KS.equals(keyspace.getName()))
continue;
- for (final ColumnFamilyStore cfStore : keyspace.getColumnFamilyStores())
+ for (ColumnFamilyStore cfStore : keyspace.getColumnFamilyStores())
{
- for (final Map.Entry<String, Pair<Long,Long>> snapshotDetail : cfStore.getSnapshotDetails().entrySet())
+ for (Map.Entry<String, Pair<Long,Long>> snapshotDetail : cfStore.getSnapshotDetails().entrySet())
{
TabularDataSupport data = (TabularDataSupport)snapshotMap.get(snapshotDetail.getKey());
if (data == null)
@@ -2346,12 +2357,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public long trueSnapshotsSize()
{
long total = 0;
- for (final Keyspace keyspace : Keyspace.all())
+ for (Keyspace keyspace : Keyspace.all())
{
if (Keyspace.SYSTEM_KS.equals(keyspace.getName()))
continue;
- for (final ColumnFamilyStore cfStore : keyspace.getColumnFamilyStores())
+ for (ColumnFamilyStore cfStore : keyspace.getColumnFamilyStores())
{
total += cfStore.trueSnapshotsSize();
}
@@ -2419,7 +2430,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
if (idxName != null)
{
- Collection< SecondaryIndex > indexes = cfStore.indexManager.getIndexesByNames(new HashSet<String>(Arrays.asList(cfName)));
+ Collection< SecondaryIndex > indexes = cfStore.indexManager.getIndexesByNames(new HashSet<>(Arrays.asList(cfName)));
if (indexes.isEmpty())
logger.warn(String.format("Invalid column family index specified: %s/%s. Proceeding with others.", baseCfName, idxName));
else
@@ -2449,7 +2460,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* @param columnFamilies
* @throws IOException
*/
- public void forceKeyspaceFlush(final String keyspaceName, final String... columnFamilies) throws IOException
+ public void forceKeyspaceFlush(String keyspaceName, String... columnFamilies) throws IOException
{
for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, keyspaceName, columnFamilies))
{
@@ -2472,19 +2483,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
sendNotification(jmxNotification);
}
- public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, Collection<String> hosts, final boolean primaryRange, final boolean fullRepair, final String... columnFamilies) throws IOException
+ public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException
{
- final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
+ Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, ranges, fullRepair, columnFamilies);
}
- public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, Collection<String> hosts, final Collection<Range<Token>> ranges, final boolean fullRepair, final String... columnFamilies) throws IOException
+ public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies)
{
if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty())
return 0;
- final int cmd = nextRepairCommand.incrementAndGet();
+ int cmd = nextRepairCommand.incrementAndGet();
if (ranges.size() > 0)
{
new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, hosts, fullRepair, columnFamilies)).start();
@@ -2492,9 +2503,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return cmd;
}
- public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final boolean fullRepair, final String... columnFamilies)
+ public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies)
{
- final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
+ Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
return forceRepairAsync(keyspace, isSequential, isLocal, ranges, fullRepair, columnFamilies);
}
@@ -2503,7 +2514,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty())
return 0;
- final int cmd = nextRepairCommand.incrementAndGet();
+ int cmd = nextRepairCommand.incrementAndGet();
if (!FBUtilities.isUnix() && isSequential)
{
logger.warn("Snapshot-based repair is not yet supported on Windows. Reverting to parallel repair.");
@@ -2513,33 +2524,33 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return cmd;
}
- public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, final String... columnFamilies) throws IOException
+ public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies) throws IOException
{
Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
- return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), fullRepair, columnFamilies);
+ return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, Collections.singleton(new Range<>(parsedBeginToken, parsedEndToken)), fullRepair, columnFamilies);
}
- public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, final String... columnFamilies)
+ public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... columnFamilies)
{
Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
- return forceRepairAsync(keyspaceName, isSequential, isLocal, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), fullRepair, columnFamilies);
+ return forceRepairAsync(keyspaceName, isSequential, isLocal, Collections.singleton(new Range<>(parsedBeginToken, parsedEndToken)), fullRepair, columnFamilies);
}
- private FutureTask<Object> createRepairTask(final int cmd,
- final String keyspace,
- final Collection<Range<Token>> ranges,
- final boolean isSequential,
- final boolean isLocal,
- final boolean fullRepair,
- final String... columnFamilies)
+ private FutureTask<Object> createRepairTask(int cmd,
+ String keyspace,
+ Collection<Range<Token>> ranges,
+ boolean isSequential,
+ boolean isLocal,
+ boolean fullRepair,
+ String... columnFamilies)
{
Set<String> dataCenters = null;
if (isLocal)
@@ -2657,9 +2668,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
- public RepairFuture forceKeyspaceRepair(final UUID parentRepairSession,
- final Range<Token> range,
- final String keyspaceName,
+ public RepairFuture forceKeyspaceRepair(UUID parentRepairSession,
+ Range<Token> range,
+ String keyspaceName,
boolean isSequential,
Set<InetAddress> endpoints,
String ... columnFamilies) throws IOException
@@ -2698,13 +2709,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public Collection<Range<Token>> getPrimaryRangesForEndpoint(String keyspace, InetAddress ep)
{
AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
- Collection<Range<Token>> primaryRanges = new HashSet<Range<Token>>();
+ Collection<Range<Token>> primaryRanges = new HashSet<>();
TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap();
for (Token token : metadata.sortedTokens())
{
List<InetAddress> endpoints = strategy.calculateNaturalEndpoints(token, metadata);
if (endpoints.size() > 0 && endpoints.get(0).equals(ep))
- primaryRanges.add(new Range<Token>(metadata.getPredecessor(token), token));
+ primaryRanges.add(new Range<>(metadata.getPredecessor(token), token));
}
return primaryRanges;
}
@@ -2750,13 +2761,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (sortedTokens.isEmpty())
return Collections.emptyList();
int size = sortedTokens.size();
- List<Range<Token>> ranges = new ArrayList<Range<Token>>(size + 1);
+ List<Range<Token>> ranges = new ArrayList<>(size + 1);
for (int i = 1; i < size; ++i)
{
- Range<Token> range = new Range<Token>(sortedTokens.get(i - 1), sortedTokens.get(i));
+ Range<Token> range = new Range<>(sortedTokens.get(i - 1), sortedTokens.get(i));
ranges.add(range);
}
- Range<Token> range = new Range<Token>(sortedTokens.get(size - 1), sortedTokens.get(0));
+ Range<Token> range = new Range<>(sortedTokens.get(size - 1), sortedTokens.get(0));
ranges.add(range);
return ranges;
@@ -2811,7 +2822,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public List<InetAddress> getLiveNaturalEndpoints(Keyspace keyspace, RingPosition pos)
{
List<InetAddress> endpoints = keyspace.getReplicationStrategy().getNaturalEndpoints(pos);
- List<InetAddress> liveEps = new ArrayList<InetAddress>(endpoints.size());
+ List<InetAddress> liveEps = new ArrayList<>(endpoints.size());
for (InetAddress endpoint : endpoints)
{
@@ -2843,9 +2854,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
long totalRowCountEstimate = cfs.estimatedKeysForRange(range);
// splitCount should be much smaller than number of key samples, to avoid huge sampling error
- final int minSamplesPerSplit = 4;
- final int maxSplitCount = keys.size() / minSamplesPerSplit + 1;
- final int splitCount = Math.max(1, Math.min(maxSplitCount, (int)(totalRowCountEstimate / keysPerSplit)));
+ int minSamplesPerSplit = 4;
+ int maxSplitCount = keys.size() / minSamplesPerSplit + 1;
+ int splitCount = Math.max(1, Math.min(maxSplitCount, (int)(totalRowCountEstimate / keysPerSplit)));
List<Token> tokens = keysToTokens(range, keys);
return getSplits(tokens, splitCount, cfs);
@@ -2853,7 +2864,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private List<Pair<Range<Token>, Long>> getSplits(List<Token> tokens, int splitCount, ColumnFamilyStore cfs)
{
- final double step = (double) (tokens.size() - 1) / splitCount;
+ double step = (double) (tokens.size() - 1) / splitCount;
Token prevToken = tokens.get(0);
List<Pair<Range<Token>, Long>> splits = Lists.newArrayListWithExpectedSize(splitCount);
for (int i = 1; i <= splitCount; i++)
@@ -2879,7 +2890,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private List<DecoratedKey> keySamples(Iterable<ColumnFamilyStore> cfses, Range<Token> range)
{
- List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
+ List<DecoratedKey> keys = new ArrayList<>();
for (ColumnFamilyStore cfs : cfses)
Iterables.addAll(keys, cfs.keySamples(range));
FBUtilities.sortSampledKeys(keys, range);
@@ -2943,11 +2954,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
}
- private void unbootstrap(final Runnable onFinish)
+ private void unbootstrap(Runnable onFinish)
{
- Map<String, Multimap<Range<Token>, InetAddress>> rangesToStream = new HashMap<String, Multimap<Range<Token>, InetAddress>>();
+ Map<String, Multimap<Range<Token>, InetAddress>> rangesToStream = new HashMap<>();
- for (final String keyspaceName : Schema.instance.getNonSystemKeyspaces())
+ for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
{
Multimap<Range<Token>, InetAddress> rangesMM = getChangedRangesForLeaving(keyspaceName, FBUtilities.getBroadcastAddress());
@@ -2984,7 +2995,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return Futures.immediateFuture(null);
// gather all live nodes in the cluster that aren't also leaving
- List<InetAddress> candidates = new ArrayList<InetAddress>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints());
+ List<InetAddress> candidates = new ArrayList<>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints());
candidates.remove(FBUtilities.getBroadcastAddress());
for (Iterator<InetAddress> iter = candidates.iterator(); iter.hasNext(); )
{
@@ -3006,7 +3017,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// stream all hints -- range list will be a singleton of "the entire ring"
Token token = StorageService.getPartitioner().getMinimumToken();
- List<Range<Token>> ranges = Collections.singletonList(new Range<Token>(token, token));
+ List<Range<Token>> ranges = Collections.singletonList(new Range<>(token, token));
return new StreamPlan("Hints").transferRanges(hintsDestinationHost,
Keyspace.SYSTEM_KS,
@@ -3097,7 +3108,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private class RangeRelocator
{
- private StreamPlan streamPlan = new StreamPlan("Bootstrap");
+ private final StreamPlan streamPlan = new StreamPlan("Bootstrap");
private RangeRelocator(Collection<Token> tokens, List<String> keyspaceNames)
{
@@ -3189,7 +3200,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public void relocate(Collection<String> srcTokens) throws IOException
{
- List<Token> tokens = new ArrayList<Token>(srcTokens.size());
+ List<Token> tokens = new ArrayList<>(srcTokens.size());
try
{
for (String srcT : srcTokens)
@@ -3213,7 +3224,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
assert srcTokens != null;
InetAddress localAddress = FBUtilities.getBroadcastAddress();
Collection<Token> localTokens = getTokenMetadata().getTokens(localAddress);
- Set<Token> tokens = new HashSet<Token>(srcTokens);
+ Set<Token> tokens = new HashSet<>(srcTokens);
Iterator<Token> it = tokens.iterator();
while (it.hasNext())
@@ -3469,7 +3480,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
totalCFs += keyspace.getColumnFamilyStores().size();
remainingCFs = totalCFs;
// flush
- List<Future<?>> flushes = new ArrayList<Future<?>>();
+ List<Future<?>> flushes = new ArrayList<>();
for (Keyspace keyspace : Keyspace.nonSystem())
{
for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
@@ -3543,7 +3554,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
List<Token> sortedTokens = tokenMetadata.sortedTokens();
// describeOwnership returns tokens in an unspecified order, let's re-order them
Map<Token, Float> tokenMap = new TreeMap<Token, Float>(getPartitioner().describeOwnership(sortedTokens));
- Map<InetAddress, Float> nodeMap = new LinkedHashMap<InetAddress, Float>();
+ Map<InetAddress, Float> nodeMap = new LinkedHashMap<>();
for (Map.Entry<Token, Float> entry : tokenMap.entrySet())
{
InetAddress endpoint = tokenMetadata.getEndpoint(entry.getKey());
@@ -3577,9 +3588,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (keyspace == null)
keyspace = Schema.instance.getNonSystemKeyspaces().get(0);
- Collection<Collection<InetAddress>> endpointsGroupedByDc = new ArrayList<Collection<InetAddress>>();
+ Collection<Collection<InetAddress>> endpointsGroupedByDc = new ArrayList<>();
// mapping of dc's to nodes, use sorted map so that we get dcs sorted
- SortedMap<String, Collection<InetAddress>> sortedDcsToEndpoints = new TreeMap<String, Collection<InetAddress>>();
+ SortedMap<String, Collection<InetAddress>> sortedDcsToEndpoints = new TreeMap<>();
sortedDcsToEndpoints.putAll(metadata.getTopology().getDatacenterEndpoints().asMap());
for (Collection<InetAddress> endpoints : sortedDcsToEndpoints.values())
endpointsGroupedByDc.add(endpoints);
@@ -3625,7 +3636,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public List<String> getKeyspaces()
{
- List<String> keyspaceNamesList = new ArrayList<String>(Schema.instance.getKeyspaces());
+ List<String> keyspaceNamesList = new ArrayList<>(Schema.instance.getKeyspaces());
return Collections.unmodifiableList(keyspaceNamesList);
}
@@ -3668,10 +3679,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* @param rangesToStreamByKeyspace keyspaces and data ranges with endpoints included for each
* @return async Future for whether stream was success
*/
- private Future<StreamState> streamRanges(final Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByKeyspace)
+ private Future<StreamState> streamRanges(Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByKeyspace)
{
// First, we build a list of ranges to stream to each host, per table
- final Map<String, Map<InetAddress, List<Range<Token>>>> sessionsToStreamByKeyspace = new HashMap<String, Map<InetAddress, List<Range<Token>>>>();
+ Map<String, Map<InetAddress, List<Range<Token>>>> sessionsToStreamByKeyspace = new HashMap<>();
for (Map.Entry<String, Multimap<Range<Token>, InetAddress>> entry : rangesToStreamByKeyspace.entrySet())
{
String keyspace = entry.getKey();
@@ -3680,16 +3691,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (rangesWithEndpoints.isEmpty())
continue;
- Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = new HashMap<InetAddress, List<Range<Token>>>();
- for (final Map.Entry<Range<Token>, InetAddress> endPointEntry : rangesWithEndpoints.entries())
+ Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = new HashMap<>();
+ for (Map.Entry<Range<Token>, InetAddress> endPointEntry : rangesWithEndpoints.entries())
{
- final Range<Token> range = endPointEntry.getKey();
- final InetAddress endpoint = endPointEntry.getValue();
+ Range<Token> range = endPointEntry.getKey();
+ InetAddress endpoint = endPointEntry.getValue();
List<Range<Token>> curRanges = rangesPerEndpoint.get(endpoint);
if (curRanges == null)
{
- curRanges = new LinkedList<Range<Token>>();
+ curRanges = new LinkedList<>();
rangesPerEndpoint.put(endpoint, curRanges);
}
curRanges.add(range);
@@ -3701,13 +3712,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
StreamPlan streamPlan = new StreamPlan("Unbootstrap");
for (Map.Entry<String, Map<InetAddress, List<Range<Token>>>> entry : sessionsToStreamByKeyspace.entrySet())
{
- final String keyspaceName = entry.getKey();
- final Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = entry.getValue();
+ String keyspaceName = entry.getKey();
+ Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = entry.getValue();
- for (final Map.Entry<InetAddress, List<Range<Token>>> rangesEntry : rangesPerEndpoint.entrySet())
+ for (Map.Entry<InetAddress, List<Range<Token>>> rangesEntry : rangesPerEndpoint.entrySet())
{
- final List<Range<Token>> ranges = rangesEntry.getValue();
- final InetAddress newEndpoint = rangesEntry.getKey();
+ List<Range<Token>> ranges = rangesEntry.getValue();
+ InetAddress newEndpoint = rangesEntry.getKey();
// TODO each call to transferRanges re-flushes, this is potentially a lot of waste
streamPlan.transferRanges(newEndpoint, keyspaceName, ranges);
@@ -3726,8 +3737,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*/
public Pair<Set<Range<Token>>, Set<Range<Token>>> calculateStreamAndFetchRanges(Collection<Range<Token>> current, Collection<Range<Token>> updated)
{
- Set<Range<Token>> toStream = new HashSet<Range<Token>>();
- Set<Range<Token>> toFetch = new HashSet<Range<Token>>();
+ Set<Range<Token>> toStream = new HashSet<>();
+ Set<Range<Token>> toFetch = new HashSet<>();
for (Range r1 : current)
@@ -3846,14 +3857,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*/
public List<String> sampleKeyRange() // do not rename to getter - see CASSANDRA-4452 for details
{
- List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
+ List<DecoratedKey> keys = new ArrayList<>();
for (Keyspace keyspace : Keyspace.nonSystem())
{
for (Range<Token> range : getPrimaryRangesForEndpoint(keyspace.getName(), FBUtilities.getBroadcastAddress()))
keys.addAll(keySamples(keyspace.getColumnFamilyStores(), range));
}
- List<String> sampledKeys = new ArrayList<String>(keys.size());
+ List<String> sampledKeys = new ArrayList<>(keys.size());
for (DecoratedKey key : keys)
sampledKeys.add(key.getToken().toString());
return sampledKeys;
[3/3] git commit: merge from 2.1
Posted by jb...@apache.org.
merge from 2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ebe59217
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ebe59217
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ebe59217
Branch: refs/heads/trunk
Commit: ebe5921733dc3bd14b9345fa650bb28cec5f7e81
Parents: 2280416 7610102
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Mar 26 10:53:00 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Mar 26 10:53:00 2014 -0500
----------------------------------------------------------------------
.../cassandra/service/StorageService.java | 254 ++++++++++---------
1 file changed, 131 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebe59217/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index bfcfcce,042e2bc..5fe3727
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2503,11 -2514,13 +2514,8 @@@ public class StorageService extends Not
if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty())
return 0;
- final int cmd = nextRepairCommand.incrementAndGet();
- if (ranges.size() > 0)
- {
- new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, isLocal, fullRepair, columnFamilies)).start();
- }
+ int cmd = nextRepairCommand.incrementAndGet();
- if (!FBUtilities.isUnix() && isSequential)
- {
- logger.warn("Snapshot-based repair is not yet supported on Windows. Reverting to parallel repair.");
- isSequential = false;
- }
+ new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, isLocal, fullRepair, columnFamilies)).start();
return cmd;
}
[2/3] git commit: cleanup
Posted by jb...@apache.org.
cleanup
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/76101027
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/76101027
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/76101027
Branch: refs/heads/trunk
Commit: 76101027eef2fa097b2b55cdc037a3bfbe235753
Parents: 45a6373
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Mar 26 10:50:41 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Mar 26 10:50:41 2014 -0500
----------------------------------------------------------------------
.../cassandra/service/StorageService.java | 249 ++++++++++---------
1 file changed, 130 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/76101027/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 17bd514..042e2bc 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -183,9 +183,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private static final AtomicInteger nextRepairCommand = new AtomicInteger();
- private static ScheduledRangeTransferExecutorService rangeXferExecutor = new ScheduledRangeTransferExecutorService();
+ private static final ScheduledRangeTransferExecutorService rangeXferExecutor = new ScheduledRangeTransferExecutorService();
- private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<IEndpointLifecycleSubscriber>();
+ private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<>();
private static final BackgroundActivityMonitor bgMonitor = new BackgroundActivityMonitor();
@@ -564,7 +564,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
Thread drainOnShutdown = new Thread(new WrappedRunnable()
{
@Override
- public void runMayThrow() throws ExecutionException, InterruptedException, IOException
+ public void runMayThrow() throws InterruptedException
{
ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
@@ -584,7 +584,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
StorageProxy.instance.verifyNoHintsInProgress();
- List<Future<?>> flushes = new ArrayList<Future<?>>();
+ List<Future<?>> flushes = new ArrayList<>();
for (Keyspace keyspace : Keyspace.all())
{
KSMetaData ksm = Schema.instance.getKSMetaData(keyspace.getName());
@@ -634,7 +634,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
joined = true;
Collection<Token> tokens = null;
- Map<ApplicationState, VersionedValue> appStates = new HashMap<ApplicationState, VersionedValue>();
+ Map<ApplicationState, VersionedValue> appStates = new HashMap<>();
if (DatabaseDescriptor.getReplaceTokens().size() > 0 || DatabaseDescriptor.getReplaceNode() != null)
throw new RuntimeException("Replace method removed; use cassandra.replace_address instead");
@@ -684,7 +684,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
//
// We attempted to replace this with a schema-presence check, but you need a meaningful sleep
// to get schema info from gossip which defeats the purpose. See CASSANDRA-4427 for the gory details.
- Set<InetAddress> current = new HashSet<InetAddress>();
+ Set<InetAddress> current = new HashSet<>();
logger.debug("Bootstrap variables: {} {} {} {}",
DatabaseDescriptor.isAutoBootstrap(),
SystemKeyspace.bootstrapInProgress(),
@@ -800,7 +800,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
else
{
- tokens = new ArrayList<Token>(initialTokens.size());
+ tokens = new ArrayList<>(initialTokens.size());
for (String token : initialTokens)
tokens.add(getPartitioner().getTokenFactory().fromString(token));
logger.info("Saved tokens not found. Using configuration value: {}", tokens);
@@ -1054,7 +1054,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace)
{
/* All the ranges for the tokens */
- Map<List<String>, List<String>> map = new HashMap<List<String>, List<String>>();
+ Map<List<String>, List<String>> map = new HashMap<>();
for (Map.Entry<Range<Token>,List<InetAddress>> entry : getRangeToAddressMap(keyspace).entrySet())
{
map.put(entry.getKey().asList(), stringify(entry.getValue()));
@@ -1085,10 +1085,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public Map<List<String>, List<String>> getRangeToRpcaddressMap(String keyspace)
{
/* All the ranges for the tokens */
- Map<List<String>, List<String>> map = new HashMap<List<String>, List<String>>();
+ Map<List<String>, List<String>> map = new HashMap<>();
for (Map.Entry<Range<Token>, List<InetAddress>> entry : getRangeToAddressMap(keyspace).entrySet())
{
- List<String> rpcaddrs = new ArrayList<String>(entry.getValue().size());
+ List<String> rpcaddrs = new ArrayList<>(entry.getValue().size());
for (InetAddress endpoint: entry.getValue())
{
rpcaddrs.add(getRpcaddress(endpoint));
@@ -1105,10 +1105,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (keyspace == null)
keyspace = Schema.instance.getNonSystemKeyspaces().get(0);
- Map<List<String>, List<String>> map = new HashMap<List<String>, List<String>>();
+ Map<List<String>, List<String>> map = new HashMap<>();
for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : tokenMetadata.getPendingRanges(keyspace).entrySet())
{
- List<InetAddress> l = new ArrayList<InetAddress>(entry.getValue());
+ List<InetAddress> l = new ArrayList<>(entry.getValue());
map.put(entry.getKey().asList(), stringify(l));
}
return map;
@@ -1189,7 +1189,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
throw new IOException(e.getMessage());
}
- List<String> result = new ArrayList<String>(tokenRanges.size());
+ List<String> result = new ArrayList<>(tokenRanges.size());
for (TokenRange tokenRange : tokenRanges)
result.add(tokenRange.toString());
@@ -1227,7 +1227,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (keyspace == null || Keyspace.open(keyspace).getReplicationStrategy() instanceof LocalStrategy)
throw new InvalidRequestException("There is no ring for the keyspace: " + keyspace);
- List<TokenRange> ranges = new ArrayList<TokenRange>();
+ List<TokenRange> ranges = new ArrayList<>();
Token.TokenFactory tf = getPartitioner().getTokenFactory();
Map<Range<Token>, List<InetAddress>> rangeToAddressMap =
@@ -1239,9 +1239,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
Range range = entry.getKey();
List<InetAddress> addresses = entry.getValue();
- List<String> endpoints = new ArrayList<String>(addresses.size());
- List<String> rpc_endpoints = new ArrayList<String>(addresses.size());
- List<EndpointDetails> epDetails = new ArrayList<EndpointDetails>(addresses.size());
+ List<String> endpoints = new ArrayList<>(addresses.size());
+ List<String> rpc_endpoints = new ArrayList<>(addresses.size());
+ List<EndpointDetails> epDetails = new ArrayList<>(addresses.size());
for (InetAddress endpoint : addresses)
{
@@ -1270,8 +1270,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
Map<Token, InetAddress> mapInetAddress = tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap();
// in order to preserve tokens in ascending order, we use LinkedHashMap here
- Map<String, String> mapString = new LinkedHashMap<String, String>(mapInetAddress.size());
- List<Token> tokens = new ArrayList<Token>(mapInetAddress.keySet());
+ Map<String, String> mapString = new LinkedHashMap<>(mapInetAddress.size());
+ List<Token> tokens = new ArrayList<>(mapInetAddress.keySet());
Collections.sort(tokens);
for (Token token : tokens)
{
@@ -1287,7 +1287,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public Map<String, String> getHostIdMap()
{
- Map<String, String> mapOut = new HashMap<String, String>();
+ Map<String, String> mapOut = new HashMap<>();
for (Map.Entry<InetAddress, UUID> entry : getTokenMetadata().getEndpointToHostIdMapForReading().entrySet())
mapOut.put(entry.getKey().getHostAddress(), entry.getValue().toString());
return mapOut;
@@ -1301,7 +1301,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*/
private Map<Range<Token>, List<InetAddress>> constructRangeToEndpointMap(String keyspace, List<Range<Token>> ranges)
{
- Map<Range<Token>, List<InetAddress>> rangeToEndpointMap = new HashMap<Range<Token>, List<InetAddress>>();
+ Map<Range<Token>, List<InetAddress>> rangeToEndpointMap = new HashMap<>();
for (Range<Token> range : ranges)
{
rangeToEndpointMap.put(range, Keyspace.open(keyspace).getReplicationStrategy().getNaturalEndpoints(range.right));
@@ -1358,20 +1358,31 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
String moveName = pieces[0];
- if (moveName.equals(VersionedValue.STATUS_BOOTSTRAPPING))
- handleStateBootstrap(endpoint);
- else if (moveName.equals(VersionedValue.STATUS_NORMAL))
- handleStateNormal(endpoint);
- else if (moveName.equals(VersionedValue.REMOVING_TOKEN) || moveName.equals(VersionedValue.REMOVED_TOKEN))
- handleStateRemoving(endpoint, pieces);
- else if (moveName.equals(VersionedValue.STATUS_LEAVING))
- handleStateLeaving(endpoint);
- else if (moveName.equals(VersionedValue.STATUS_LEFT))
- handleStateLeft(endpoint, pieces);
- else if (moveName.equals(VersionedValue.STATUS_MOVING))
- handleStateMoving(endpoint, pieces);
- else if (moveName.equals(VersionedValue.STATUS_RELOCATING))
- handleStateRelocating(endpoint, pieces);
+ switch (moveName)
+ {
+ case VersionedValue.STATUS_BOOTSTRAPPING:
+ handleStateBootstrap(endpoint);
+ break;
+ case VersionedValue.STATUS_NORMAL:
+ handleStateNormal(endpoint);
+ break;
+ case VersionedValue.REMOVING_TOKEN:
+ case VersionedValue.REMOVED_TOKEN:
+ handleStateRemoving(endpoint, pieces);
+ break;
+ case VersionedValue.STATUS_LEAVING:
+ handleStateLeaving(endpoint);
+ break;
+ case VersionedValue.STATUS_LEFT:
+ handleStateLeft(endpoint, pieces);
+ break;
+ case VersionedValue.STATUS_MOVING:
+ handleStateMoving(endpoint, pieces);
+ break;
+ case VersionedValue.STATUS_RELOCATING:
+ handleStateRelocating(endpoint, pieces);
+ break;
+ }
}
else
{
@@ -1477,10 +1488,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
tokens = getTokensFor(endpoint);
- Set<Token> tokensToUpdateInMetadata = new HashSet<Token>();
- Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<Token>();
- Set<Token> localTokensToRemove = new HashSet<Token>();
- Set<InetAddress> endpointsToRemove = new HashSet<InetAddress>();
+ Set<Token> tokensToUpdateInMetadata = new HashSet<>();
+ Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>();
+ Set<Token> localTokensToRemove = new HashSet<>();
+ Set<InetAddress> endpointsToRemove = new HashSet<>();
if (logger.isDebugEnabled())
@@ -1704,7 +1715,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
assert pieces.length >= 2;
- List<Token> tokens = new ArrayList<Token>(pieces.length - 1);
+ List<Token> tokens = new ArrayList<>(pieces.length - 1);
for (String tStr : Arrays.copyOfRange(pieces, 1, pieces.length))
tokens.add(getPartitioner().getTokenFactory().fromString(tStr));
@@ -1889,12 +1900,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> rangesToFetch = HashMultimap.create();
- final InetAddress myAddress = FBUtilities.getBroadcastAddress();
+ InetAddress myAddress = FBUtilities.getBroadcastAddress();
for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
{
Multimap<Range<Token>, InetAddress> changedRanges = getChangedRangesForLeaving(keyspaceName, endpoint);
- Set<Range<Token>> myNewRanges = new HashSet<Range<Token>>();
+ Set<Range<Token>> myNewRanges = new HashSet<>();
for (Map.Entry<Range<Token>, InetAddress> entry : changedRanges.entries())
{
if (entry.getValue().equals(myAddress))
@@ -1908,11 +1919,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
StreamPlan stream = new StreamPlan("Restore replica count");
- for (final String keyspaceName : rangesToFetch.keySet())
+ for (String keyspaceName : rangesToFetch.keySet())
{
for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : rangesToFetch.get(keyspaceName))
{
- final InetAddress source = entry.getKey();
+ InetAddress source = entry.getKey();
Collection<Range<Token>> ranges = entry.getValue();
if (logger.isDebugEnabled())
logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges, ", "));
@@ -1945,7 +1956,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (logger.isDebugEnabled())
logger.debug("Node {} ranges [{}]", endpoint, StringUtils.join(ranges, ", "));
- Map<Range<Token>, List<InetAddress>> currentReplicaEndpoints = new HashMap<Range<Token>, List<InetAddress>>();
+ Map<Range<Token>, List<InetAddress>> currentReplicaEndpoints = new HashMap<>();
// Find (for each range) all nodes that store replicas for these ranges as well
for (Range<Token> range : ranges)
@@ -2051,7 +2062,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public Map<String, String> getLoadMap()
{
- Map<String, String> map = new HashMap<String, String>();
+ Map<String, String> map = new HashMap<>();
for (Map.Entry<InetAddress,Double> entry : LoadBroadcaster.instance.getLoadInfo().entrySet())
{
map.put(entry.getKey().getHostAddress(), FileUtils.stringifyFileSize(entry.getValue()));
@@ -2087,7 +2098,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private List<String> getTokens(InetAddress endpoint)
{
- List<String> strTokens = new ArrayList<String>();
+ List<String> strTokens = new ArrayList<>();
for (Token tok : getTokenMetadata().getTokens(endpoint))
strTokens.add(tok.toString());
return strTokens;
@@ -2110,7 +2121,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public List<String> getMovingNodes()
{
- List<String> endpoints = new ArrayList<String>();
+ List<String> endpoints = new ArrayList<>();
for (Pair<Token, InetAddress> node : tokenMetadata.getMovingEndpoints())
{
@@ -2155,7 +2166,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private List<String> stringify(Iterable<InetAddress> endpoints)
{
- List<String> stringEndpoints = new ArrayList<String>();
+ List<String> stringEndpoints = new ArrayList<>();
for (InetAddress ep : endpoints)
{
stringEndpoints.add(ep.getHostAddress());
@@ -2235,7 +2246,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
else
{
- ArrayList<Keyspace> t = new ArrayList<Keyspace>(keyspaceNames.length);
+ ArrayList<Keyspace> t = new ArrayList<>(keyspaceNames.length);
for (String keyspaceName : keyspaceNames)
t.add(getValidKeyspace(keyspaceName));
keyspaces = t;
@@ -2319,15 +2330,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public Map<String, TabularData> getSnapshotDetails()
{
- final Map<String, TabularData> snapshotMap = new HashMap<>();
- for (final Keyspace keyspace : Keyspace.all())
+ Map<String, TabularData> snapshotMap = new HashMap<>();
+ for (Keyspace keyspace : Keyspace.all())
{
if (Keyspace.SYSTEM_KS.equals(keyspace.getName()))
continue;
- for (final ColumnFamilyStore cfStore : keyspace.getColumnFamilyStores())
+ for (ColumnFamilyStore cfStore : keyspace.getColumnFamilyStores())
{
- for (final Map.Entry<String, Pair<Long,Long>> snapshotDetail : cfStore.getSnapshotDetails().entrySet())
+ for (Map.Entry<String, Pair<Long,Long>> snapshotDetail : cfStore.getSnapshotDetails().entrySet())
{
TabularDataSupport data = (TabularDataSupport)snapshotMap.get(snapshotDetail.getKey());
if (data == null)
@@ -2346,12 +2357,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public long trueSnapshotsSize()
{
long total = 0;
- for (final Keyspace keyspace : Keyspace.all())
+ for (Keyspace keyspace : Keyspace.all())
{
if (Keyspace.SYSTEM_KS.equals(keyspace.getName()))
continue;
- for (final ColumnFamilyStore cfStore : keyspace.getColumnFamilyStores())
+ for (ColumnFamilyStore cfStore : keyspace.getColumnFamilyStores())
{
total += cfStore.trueSnapshotsSize();
}
@@ -2419,7 +2430,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
if (idxName != null)
{
- Collection< SecondaryIndex > indexes = cfStore.indexManager.getIndexesByNames(new HashSet<String>(Arrays.asList(cfName)));
+ Collection< SecondaryIndex > indexes = cfStore.indexManager.getIndexesByNames(new HashSet<>(Arrays.asList(cfName)));
if (indexes.isEmpty())
logger.warn(String.format("Invalid column family index specified: %s/%s. Proceeding with others.", baseCfName, idxName));
else
@@ -2449,7 +2460,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* @param columnFamilies
* @throws IOException
*/
- public void forceKeyspaceFlush(final String keyspaceName, final String... columnFamilies) throws IOException
+ public void forceKeyspaceFlush(String keyspaceName, String... columnFamilies) throws IOException
{
for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, keyspaceName, columnFamilies))
{
@@ -2472,19 +2483,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
sendNotification(jmxNotification);
}
- public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, Collection<String> hosts, final boolean primaryRange, final boolean fullRepair, final String... columnFamilies) throws IOException
+ public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException
{
- final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
+ Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, ranges, fullRepair, columnFamilies);
}
- public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, Collection<String> hosts, final Collection<Range<Token>> ranges, final boolean fullRepair, final String... columnFamilies) throws IOException
+ public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies)
{
if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty())
return 0;
- final int cmd = nextRepairCommand.incrementAndGet();
+ int cmd = nextRepairCommand.incrementAndGet();
if (ranges.size() > 0)
{
new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, hosts, fullRepair, columnFamilies)).start();
@@ -2492,9 +2503,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return cmd;
}
- public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final boolean fullRepair, final String... columnFamilies)
+ public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies)
{
- final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
+ Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
return forceRepairAsync(keyspace, isSequential, isLocal, ranges, fullRepair, columnFamilies);
}
@@ -2503,7 +2514,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty())
return 0;
- final int cmd = nextRepairCommand.incrementAndGet();
+ int cmd = nextRepairCommand.incrementAndGet();
if (!FBUtilities.isUnix() && isSequential)
{
logger.warn("Snapshot-based repair is not yet supported on Windows. Reverting to parallel repair.");
@@ -2513,33 +2524,33 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return cmd;
}
- public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, final String... columnFamilies) throws IOException
+ public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies) throws IOException
{
Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
- return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), fullRepair, columnFamilies);
+ return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, Collections.singleton(new Range<>(parsedBeginToken, parsedEndToken)), fullRepair, columnFamilies);
}
- public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, final String... columnFamilies)
+ public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... columnFamilies)
{
Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
- return forceRepairAsync(keyspaceName, isSequential, isLocal, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), fullRepair, columnFamilies);
+ return forceRepairAsync(keyspaceName, isSequential, isLocal, Collections.singleton(new Range<>(parsedBeginToken, parsedEndToken)), fullRepair, columnFamilies);
}
- private FutureTask<Object> createRepairTask(final int cmd,
- final String keyspace,
- final Collection<Range<Token>> ranges,
- final boolean isSequential,
- final boolean isLocal,
- final boolean fullRepair,
- final String... columnFamilies)
+ private FutureTask<Object> createRepairTask(int cmd,
+ String keyspace,
+ Collection<Range<Token>> ranges,
+ boolean isSequential,
+ boolean isLocal,
+ boolean fullRepair,
+ String... columnFamilies)
{
Set<String> dataCenters = null;
if (isLocal)
@@ -2657,9 +2668,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
- public RepairFuture forceKeyspaceRepair(final UUID parentRepairSession,
- final Range<Token> range,
- final String keyspaceName,
+ public RepairFuture forceKeyspaceRepair(UUID parentRepairSession,
+ Range<Token> range,
+ String keyspaceName,
boolean isSequential,
Set<InetAddress> endpoints,
String ... columnFamilies) throws IOException
@@ -2698,13 +2709,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public Collection<Range<Token>> getPrimaryRangesForEndpoint(String keyspace, InetAddress ep)
{
AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
- Collection<Range<Token>> primaryRanges = new HashSet<Range<Token>>();
+ Collection<Range<Token>> primaryRanges = new HashSet<>();
TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap();
for (Token token : metadata.sortedTokens())
{
List<InetAddress> endpoints = strategy.calculateNaturalEndpoints(token, metadata);
if (endpoints.size() > 0 && endpoints.get(0).equals(ep))
- primaryRanges.add(new Range<Token>(metadata.getPredecessor(token), token));
+ primaryRanges.add(new Range<>(metadata.getPredecessor(token), token));
}
return primaryRanges;
}
@@ -2750,13 +2761,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (sortedTokens.isEmpty())
return Collections.emptyList();
int size = sortedTokens.size();
- List<Range<Token>> ranges = new ArrayList<Range<Token>>(size + 1);
+ List<Range<Token>> ranges = new ArrayList<>(size + 1);
for (int i = 1; i < size; ++i)
{
- Range<Token> range = new Range<Token>(sortedTokens.get(i - 1), sortedTokens.get(i));
+ Range<Token> range = new Range<>(sortedTokens.get(i - 1), sortedTokens.get(i));
ranges.add(range);
}
- Range<Token> range = new Range<Token>(sortedTokens.get(size - 1), sortedTokens.get(0));
+ Range<Token> range = new Range<>(sortedTokens.get(size - 1), sortedTokens.get(0));
ranges.add(range);
return ranges;
@@ -2811,7 +2822,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public List<InetAddress> getLiveNaturalEndpoints(Keyspace keyspace, RingPosition pos)
{
List<InetAddress> endpoints = keyspace.getReplicationStrategy().getNaturalEndpoints(pos);
- List<InetAddress> liveEps = new ArrayList<InetAddress>(endpoints.size());
+ List<InetAddress> liveEps = new ArrayList<>(endpoints.size());
for (InetAddress endpoint : endpoints)
{
@@ -2843,9 +2854,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
long totalRowCountEstimate = cfs.estimatedKeysForRange(range);
// splitCount should be much smaller than number of key samples, to avoid huge sampling error
- final int minSamplesPerSplit = 4;
- final int maxSplitCount = keys.size() / minSamplesPerSplit + 1;
- final int splitCount = Math.max(1, Math.min(maxSplitCount, (int)(totalRowCountEstimate / keysPerSplit)));
+ int minSamplesPerSplit = 4;
+ int maxSplitCount = keys.size() / minSamplesPerSplit + 1;
+ int splitCount = Math.max(1, Math.min(maxSplitCount, (int)(totalRowCountEstimate / keysPerSplit)));
List<Token> tokens = keysToTokens(range, keys);
return getSplits(tokens, splitCount, cfs);
@@ -2853,7 +2864,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private List<Pair<Range<Token>, Long>> getSplits(List<Token> tokens, int splitCount, ColumnFamilyStore cfs)
{
- final double step = (double) (tokens.size() - 1) / splitCount;
+ double step = (double) (tokens.size() - 1) / splitCount;
Token prevToken = tokens.get(0);
List<Pair<Range<Token>, Long>> splits = Lists.newArrayListWithExpectedSize(splitCount);
for (int i = 1; i <= splitCount; i++)
@@ -2879,7 +2890,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private List<DecoratedKey> keySamples(Iterable<ColumnFamilyStore> cfses, Range<Token> range)
{
- List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
+ List<DecoratedKey> keys = new ArrayList<>();
for (ColumnFamilyStore cfs : cfses)
Iterables.addAll(keys, cfs.keySamples(range));
FBUtilities.sortSampledKeys(keys, range);
@@ -2943,11 +2954,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
}
- private void unbootstrap(final Runnable onFinish)
+ private void unbootstrap(Runnable onFinish)
{
- Map<String, Multimap<Range<Token>, InetAddress>> rangesToStream = new HashMap<String, Multimap<Range<Token>, InetAddress>>();
+ Map<String, Multimap<Range<Token>, InetAddress>> rangesToStream = new HashMap<>();
- for (final String keyspaceName : Schema.instance.getNonSystemKeyspaces())
+ for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
{
Multimap<Range<Token>, InetAddress> rangesMM = getChangedRangesForLeaving(keyspaceName, FBUtilities.getBroadcastAddress());
@@ -2984,7 +2995,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return Futures.immediateFuture(null);
// gather all live nodes in the cluster that aren't also leaving
- List<InetAddress> candidates = new ArrayList<InetAddress>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints());
+ List<InetAddress> candidates = new ArrayList<>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints());
candidates.remove(FBUtilities.getBroadcastAddress());
for (Iterator<InetAddress> iter = candidates.iterator(); iter.hasNext(); )
{
@@ -3006,7 +3017,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// stream all hints -- range list will be a singleton of "the entire ring"
Token token = StorageService.getPartitioner().getMinimumToken();
- List<Range<Token>> ranges = Collections.singletonList(new Range<Token>(token, token));
+ List<Range<Token>> ranges = Collections.singletonList(new Range<>(token, token));
return new StreamPlan("Hints").transferRanges(hintsDestinationHost,
Keyspace.SYSTEM_KS,
@@ -3097,7 +3108,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private class RangeRelocator
{
- private StreamPlan streamPlan = new StreamPlan("Bootstrap");
+ private final StreamPlan streamPlan = new StreamPlan("Bootstrap");
private RangeRelocator(Collection<Token> tokens, List<String> keyspaceNames)
{
@@ -3189,7 +3200,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public void relocate(Collection<String> srcTokens) throws IOException
{
- List<Token> tokens = new ArrayList<Token>(srcTokens.size());
+ List<Token> tokens = new ArrayList<>(srcTokens.size());
try
{
for (String srcT : srcTokens)
@@ -3213,7 +3224,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
assert srcTokens != null;
InetAddress localAddress = FBUtilities.getBroadcastAddress();
Collection<Token> localTokens = getTokenMetadata().getTokens(localAddress);
- Set<Token> tokens = new HashSet<Token>(srcTokens);
+ Set<Token> tokens = new HashSet<>(srcTokens);
Iterator<Token> it = tokens.iterator();
while (it.hasNext())
@@ -3469,7 +3480,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
totalCFs += keyspace.getColumnFamilyStores().size();
remainingCFs = totalCFs;
// flush
- List<Future<?>> flushes = new ArrayList<Future<?>>();
+ List<Future<?>> flushes = new ArrayList<>();
for (Keyspace keyspace : Keyspace.nonSystem())
{
for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
@@ -3543,7 +3554,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
List<Token> sortedTokens = tokenMetadata.sortedTokens();
// describeOwnership returns tokens in an unspecified order, let's re-order them
Map<Token, Float> tokenMap = new TreeMap<Token, Float>(getPartitioner().describeOwnership(sortedTokens));
- Map<InetAddress, Float> nodeMap = new LinkedHashMap<InetAddress, Float>();
+ Map<InetAddress, Float> nodeMap = new LinkedHashMap<>();
for (Map.Entry<Token, Float> entry : tokenMap.entrySet())
{
InetAddress endpoint = tokenMetadata.getEndpoint(entry.getKey());
@@ -3577,9 +3588,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (keyspace == null)
keyspace = Schema.instance.getNonSystemKeyspaces().get(0);
- Collection<Collection<InetAddress>> endpointsGroupedByDc = new ArrayList<Collection<InetAddress>>();
+ Collection<Collection<InetAddress>> endpointsGroupedByDc = new ArrayList<>();
// mapping of dc's to nodes, use sorted map so that we get dcs sorted
- SortedMap<String, Collection<InetAddress>> sortedDcsToEndpoints = new TreeMap<String, Collection<InetAddress>>();
+ SortedMap<String, Collection<InetAddress>> sortedDcsToEndpoints = new TreeMap<>();
sortedDcsToEndpoints.putAll(metadata.getTopology().getDatacenterEndpoints().asMap());
for (Collection<InetAddress> endpoints : sortedDcsToEndpoints.values())
endpointsGroupedByDc.add(endpoints);
@@ -3625,7 +3636,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public List<String> getKeyspaces()
{
- List<String> keyspaceNamesList = new ArrayList<String>(Schema.instance.getKeyspaces());
+ List<String> keyspaceNamesList = new ArrayList<>(Schema.instance.getKeyspaces());
return Collections.unmodifiableList(keyspaceNamesList);
}
@@ -3668,10 +3679,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* @param rangesToStreamByKeyspace keyspaces and data ranges with endpoints included for each
* @return async Future for whether stream was success
*/
- private Future<StreamState> streamRanges(final Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByKeyspace)
+ private Future<StreamState> streamRanges(Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByKeyspace)
{
// First, we build a list of ranges to stream to each host, per table
- final Map<String, Map<InetAddress, List<Range<Token>>>> sessionsToStreamByKeyspace = new HashMap<String, Map<InetAddress, List<Range<Token>>>>();
+ Map<String, Map<InetAddress, List<Range<Token>>>> sessionsToStreamByKeyspace = new HashMap<>();
for (Map.Entry<String, Multimap<Range<Token>, InetAddress>> entry : rangesToStreamByKeyspace.entrySet())
{
String keyspace = entry.getKey();
@@ -3680,16 +3691,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (rangesWithEndpoints.isEmpty())
continue;
- Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = new HashMap<InetAddress, List<Range<Token>>>();
- for (final Map.Entry<Range<Token>, InetAddress> endPointEntry : rangesWithEndpoints.entries())
+ Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = new HashMap<>();
+ for (Map.Entry<Range<Token>, InetAddress> endPointEntry : rangesWithEndpoints.entries())
{
- final Range<Token> range = endPointEntry.getKey();
- final InetAddress endpoint = endPointEntry.getValue();
+ Range<Token> range = endPointEntry.getKey();
+ InetAddress endpoint = endPointEntry.getValue();
List<Range<Token>> curRanges = rangesPerEndpoint.get(endpoint);
if (curRanges == null)
{
- curRanges = new LinkedList<Range<Token>>();
+ curRanges = new LinkedList<>();
rangesPerEndpoint.put(endpoint, curRanges);
}
curRanges.add(range);
@@ -3701,13 +3712,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
StreamPlan streamPlan = new StreamPlan("Unbootstrap");
for (Map.Entry<String, Map<InetAddress, List<Range<Token>>>> entry : sessionsToStreamByKeyspace.entrySet())
{
- final String keyspaceName = entry.getKey();
- final Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = entry.getValue();
+ String keyspaceName = entry.getKey();
+ Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = entry.getValue();
- for (final Map.Entry<InetAddress, List<Range<Token>>> rangesEntry : rangesPerEndpoint.entrySet())
+ for (Map.Entry<InetAddress, List<Range<Token>>> rangesEntry : rangesPerEndpoint.entrySet())
{
- final List<Range<Token>> ranges = rangesEntry.getValue();
- final InetAddress newEndpoint = rangesEntry.getKey();
+ List<Range<Token>> ranges = rangesEntry.getValue();
+ InetAddress newEndpoint = rangesEntry.getKey();
// TODO each call to transferRanges re-flushes, this is potentially a lot of waste
streamPlan.transferRanges(newEndpoint, keyspaceName, ranges);
@@ -3726,8 +3737,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*/
public Pair<Set<Range<Token>>, Set<Range<Token>>> calculateStreamAndFetchRanges(Collection<Range<Token>> current, Collection<Range<Token>> updated)
{
- Set<Range<Token>> toStream = new HashSet<Range<Token>>();
- Set<Range<Token>> toFetch = new HashSet<Range<Token>>();
+ Set<Range<Token>> toStream = new HashSet<>();
+ Set<Range<Token>> toFetch = new HashSet<>();
for (Range r1 : current)
@@ -3846,14 +3857,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*/
public List<String> sampleKeyRange() // do not rename to getter - see CASSANDRA-4452 for details
{
- List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
+ List<DecoratedKey> keys = new ArrayList<>();
for (Keyspace keyspace : Keyspace.nonSystem())
{
for (Range<Token> range : getPrimaryRangesForEndpoint(keyspace.getName(), FBUtilities.getBroadcastAddress()))
keys.addAll(keySamples(keyspace.getColumnFamilyStores(), range));
}
- List<String> sampledKeys = new ArrayList<String>(keys.size());
+ List<String> sampledKeys = new ArrayList<>(keys.size());
for (DecoratedKey key : keys)
sampledKeys.add(key.getToken().toString());
return sampledKeys;