You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/05/03 05:49:54 UTC
[5/9] git commit: Make batchlog replica selection rack-aware patch by
Mikhail Stepura; reviewed by jbellis for CASSANDRA-6551
Make batchlog replica selection rack-aware
patch by Mikhail Stepura; reviewed by jbellis for CASSANDRA-6551
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/af96d405
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/af96d405
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/af96d405
Branch: refs/heads/cassandra-2.1
Commit: af96d405b42a9e4ae23cba841b7a5d83ee8f7ec8
Parents: fab4557
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri May 2 22:47:27 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri May 2 22:47:27 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../service/BatchlogEndpointSelector.java | 110 ++++++++++++++++++
.../apache/cassandra/service/StorageProxy.java | 21 +---
.../service/BatchlogEndpointSelectorTest.java | 116 +++++++++++++++++++
4 files changed, 232 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af96d405/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9ab1a5f..5799659 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,5 @@
2.0.8
-=======
+ * Make batchlog replica selection rack-aware (CASSANDRA-6551)
* Add Google Compute Engine snitch (CASSANDRA-7132)
* Allow overriding cassandra-rackdc.properties file (CASSANDRA-7072)
* Set JMX RMI port to 7199 (CASSANDRA-7087)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af96d405/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java b/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java
new file mode 100644
index 0000000..bf032f5
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.utils.FBUtilities;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+
+public class BatchlogEndpointSelector
+{
+ private final String localRack;
+
+ public BatchlogEndpointSelector(String localRack)
+ {
+ this.localRack = localRack;
+ }
+
+ /**
+ * @param endpoints nodes in the local datacenter, grouped by rack name
+ * @return list of candidates for batchlog hosting. if possible these will be two nodes from different racks.
+ */
+ public Collection<InetAddress> chooseEndpoints(Multimap<String, InetAddress> endpoints)
+ {
+ // strip out dead endpoints and localhost
+ ListMultimap<String, InetAddress> validated = ArrayListMultimap.create();
+ for (Map.Entry<String, InetAddress> entry : endpoints.entries())
+ {
+ if (isValid(entry.getValue()))
+ validated.put(entry.getKey(), entry.getValue());
+ }
+ if (validated.size() <= 2)
+ return validated.values();
+
+ if ((validated.size() - validated.get(localRack).size()) >= 2)
+ {
+ // we have enough endpoints in other racks
+ validated.removeAll(localRack);
+ }
+
+ if (validated.keySet().size() == 1)
+ {
+ // we have only 1 `other` rack
+ Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values());
+ return Lists.newArrayList(Iterables.limit(otherRack, 2));
+ }
+
+ // randomize which racks we pick from if more than 2 remaining
+ Collection<String> racks;
+ if (validated.keySet().size() == 2)
+ {
+ racks = validated.keySet();
+ }
+ else
+ {
+ racks = Lists.newArrayList(validated.keySet());
+ Collections.shuffle((List) racks);
+ }
+
+ // grab a random member of up to two racks
+ List<InetAddress> result = new ArrayList<>(2);
+ for (String rack : Iterables.limit(racks, 2))
+ {
+ List<InetAddress> rackMembers = validated.get(rack);
+ result.add(rackMembers.get(getRandomInt(rackMembers.size())));
+ }
+
+ return result;
+ }
+
+ @VisibleForTesting
+ protected boolean isValid(InetAddress input)
+ {
+ return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input);
+ }
+
+ @VisibleForTesting
+ protected int getRandomInt(int bound)
+ {
+ return FBUtilities.threadLocalRandom().nextInt(bound);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af96d405/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 14d5ee2..2bf8e7f 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -22,7 +22,6 @@ import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
@@ -729,24 +728,14 @@ public class StorageProxy implements StorageProxyMBean
throws UnavailableException
{
TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
- List<InetAddress> localEndpoints = new ArrayList<>(topology.getDatacenterEndpoints().get(localDataCenter));
-
+ Multimap<String, InetAddress> localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
+
// special case for single-node datacenters
if (localEndpoints.size() == 1)
- return localEndpoints;
-
- List<InetAddress> chosenEndpoints = new ArrayList<>(2);
- int startOffset = new Random().nextInt(localEndpoints.size());
+ return localEndpoints.values();
- // starts at some random point in the list, advances forward until the end, then loops
- // around to the beginning, advancing again until it is back at the starting point again.
- for (int i = 0; i < localEndpoints.size() && chosenEndpoints.size() < 2; i++)
- {
- InetAddress endpoint = localEndpoints.get((i + startOffset) % localEndpoints.size());
- // skip localhost and non-alive nodes
- if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(endpoint))
- chosenEndpoints.add(endpoint);
- }
+ String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddress());
+ Collection<InetAddress> chosenEndpoints = new BatchlogEndpointSelector(localRack).chooseEndpoints(localEndpoints);
if (chosenEndpoints.isEmpty())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af96d405/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java b/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java
new file mode 100644
index 0000000..293078d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+
+import org.junit.Test;
+import org.junit.matchers.JUnitMatchers;
+
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+
+public class BatchlogEndpointSelectorTest
+{
+ private final BatchlogEndpointSelector target;
+ private static final String LOCAL = "local";
+
+
+ public BatchlogEndpointSelectorTest() throws UnknownHostException
+ {
+ target = new BatchlogEndpointSelector(LOCAL)
+ {
+ @Override
+ protected boolean isValid(InetAddress input)
+ {
+ //we will use always alive non-localhost endpoints
+ return true;
+ }
+
+ @Override
+ protected int getRandomInt(int bound)
+ {
+ //we don't need a random behavior here
+ return bound - 1;
+ }
+ };
+ }
+
+ @Test
+ public void shouldSelect2hostsFromNonLocalRacks() throws UnknownHostException
+ {
+ Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+ .put(LOCAL, InetAddress.getByName("0"))
+ .put(LOCAL, InetAddress.getByName("00"))
+ .put("1", InetAddress.getByName("1"))
+ .put("1", InetAddress.getByName("11"))
+ .put("2", InetAddress.getByName("2"))
+ .put("2", InetAddress.getByName("22"))
+ .build();
+ Collection<InetAddress> result = target.chooseEndpoints(endpoints);
+ assertThat(result.size(), is(2));
+ assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
+ assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("22")));
+ }
+
+ @Test
+ public void shouldSelectHostFromLocal() throws UnknownHostException
+ {
+ Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+ .put(LOCAL, InetAddress.getByName("0"))
+ .put(LOCAL, InetAddress.getByName("00"))
+ .put("1", InetAddress.getByName("1"))
+ .build();
+ Collection<InetAddress> result = target.chooseEndpoints(endpoints);
+ assertThat(result.size(), is(2));
+ assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1")));
+ assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
+ }
+
+ @Test
+ public void shouldReturnAsIsIfNoEnoughEndpoints() throws UnknownHostException
+ {
+ Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+ .put(LOCAL, InetAddress.getByName("0"))
+ .build();
+ Collection<InetAddress> result = target.chooseEndpoints(endpoints);
+ assertThat(result.size(), is(1));
+ assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
+ }
+
+ @Test
+ public void shouldSelectTwoFirstHostsFromSingleOtherRack() throws UnknownHostException
+ {
+ Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+ .put(LOCAL, InetAddress.getByName("0"))
+ .put(LOCAL, InetAddress.getByName("00"))
+ .put("1", InetAddress.getByName("1"))
+ .put("1", InetAddress.getByName("11"))
+ .put("1", InetAddress.getByName("111"))
+ .build();
+ Collection<InetAddress> result = target.chooseEndpoints(endpoints);
+ assertThat(result.size(), is(2));
+ assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1")));
+ assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
+ }
+}