You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/05/03 05:49:54 UTC

[5/9] git commit: Make batchlog replica selection rack-aware patch by Mikhail Stepura; reviewed by jbellis for CASSANDRA-6551

Make batchlog replica selection rack-aware
patch by Mikhail Stepura; reviewed by jbellis for CASSANDRA-6551


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/af96d405
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/af96d405
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/af96d405

Branch: refs/heads/cassandra-2.1
Commit: af96d405b42a9e4ae23cba841b7a5d83ee8f7ec8
Parents: fab4557
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri May 2 22:47:27 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri May 2 22:47:27 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 .../service/BatchlogEndpointSelector.java       | 110 ++++++++++++++++++
 .../apache/cassandra/service/StorageProxy.java  |  21 +---
 .../service/BatchlogEndpointSelectorTest.java   | 116 +++++++++++++++++++
 4 files changed, 232 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/af96d405/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9ab1a5f..5799659 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,5 @@
 2.0.8
-=======
+ * Make batchlog replica selection rack-aware (CASSANDRA-6551)
  * Add Google Compute Engine snitch (CASSANDRA-7132)
  * Allow overriding cassandra-rackdc.properties file (CASSANDRA-7072)
  * Set JMX RMI port to 7199 (CASSANDRA-7087)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af96d405/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java b/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java
new file mode 100644
index 0000000..bf032f5
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.utils.FBUtilities;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+
+public class BatchlogEndpointSelector
+{
+    private final String localRack;
+    
+    public BatchlogEndpointSelector(String localRack)
+    {
+        this.localRack = localRack;
+    }
+
+    /**
+     * @param endpoints nodes in the local datacenter, grouped by rack name
+     * @return list of candidates for batchlog hosting.  if possible these will be two nodes from different racks.
+     */
+    public Collection<InetAddress> chooseEndpoints(Multimap<String, InetAddress> endpoints)
+    {
+        // strip out dead endpoints and localhost
+        ListMultimap<String, InetAddress> validated = ArrayListMultimap.create();
+        for (Map.Entry<String, InetAddress> entry : endpoints.entries())
+        {
+            if (isValid(entry.getValue()))
+                validated.put(entry.getKey(), entry.getValue());
+        }
+        if (validated.size() <= 2)
+            return validated.values();
+
+        if ((validated.size() - validated.get(localRack).size()) >= 2)
+        {
+            // we have enough endpoints in other racks
+            validated.removeAll(localRack);
+        }
+
+        if (validated.keySet().size() == 1)
+        {
+            // we have only 1 `other` rack
+            Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values());
+            return Lists.newArrayList(Iterables.limit(otherRack, 2));
+        }
+
+        // randomize which racks we pick from if more than 2 remaining
+        Collection<String> racks;
+        if (validated.keySet().size() == 2)
+        {
+            racks = validated.keySet();
+        }
+        else
+        {
+            racks = Lists.newArrayList(validated.keySet());
+            Collections.shuffle((List) racks);
+        }
+
+        // grab a random member of up to two racks
+        List<InetAddress> result = new ArrayList<>(2);
+        for (String rack : Iterables.limit(racks, 2))
+        {
+            List<InetAddress> rackMembers = validated.get(rack);
+            result.add(rackMembers.get(getRandomInt(rackMembers.size())));
+        }
+
+        return result;
+    }
+    
+    @VisibleForTesting
+    protected boolean isValid(InetAddress input)
+    {
+        return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input);
+    }
+    
+    @VisibleForTesting
+    protected int getRandomInt(int bound)
+    {
+        return FBUtilities.threadLocalRandom().nextInt(bound);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af96d405/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 14d5ee2..2bf8e7f 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -22,7 +22,6 @@ import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.Random;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.MBeanServer;
@@ -729,24 +728,14 @@ public class StorageProxy implements StorageProxyMBean
     throws UnavailableException
     {
         TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
-        List<InetAddress> localEndpoints = new ArrayList<>(topology.getDatacenterEndpoints().get(localDataCenter));
-
+        Multimap<String, InetAddress> localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
+        
         // special case for single-node datacenters
         if (localEndpoints.size() == 1)
-            return localEndpoints;
-
-        List<InetAddress> chosenEndpoints = new ArrayList<>(2);
-        int startOffset = new Random().nextInt(localEndpoints.size());
+            return localEndpoints.values();
 
-        // starts at some random point in the list, advances forward until the end, then loops
-        // around to the beginning, advancing again until it is back at the starting point again.
-        for (int i = 0; i < localEndpoints.size() && chosenEndpoints.size() < 2; i++)
-        {
-            InetAddress endpoint = localEndpoints.get((i + startOffset) % localEndpoints.size());
-            // skip localhost and non-alive nodes
-            if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(endpoint))
-                chosenEndpoints.add(endpoint);
-        }
+        String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddress());
+        Collection<InetAddress> chosenEndpoints = new BatchlogEndpointSelector(localRack).chooseEndpoints(localEndpoints);
 
         if (chosenEndpoints.isEmpty())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af96d405/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java b/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java
new file mode 100644
index 0000000..293078d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+
+import org.junit.Test;
+import org.junit.matchers.JUnitMatchers;
+
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+
+public class BatchlogEndpointSelectorTest
+{
+    private final BatchlogEndpointSelector target;
+    private static final String LOCAL = "local";
+    
+
+    public BatchlogEndpointSelectorTest() throws UnknownHostException
+    {
+        target = new BatchlogEndpointSelector(LOCAL)
+        {
+            @Override
+            protected boolean isValid(InetAddress input)
+            {   
+                //we will use always alive non-localhost endpoints
+                return true;
+            }
+
+            @Override
+            protected int getRandomInt(int bound)
+            {
+                //we don't need a random behavior here
+                return bound - 1;
+            }
+        };
+    }
+    
+    @Test
+    public void shouldSelect2hostsFromNonLocalRacks() throws UnknownHostException
+    {
+        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+                .put(LOCAL, InetAddress.getByName("0"))
+                .put(LOCAL, InetAddress.getByName("00"))
+                .put("1", InetAddress.getByName("1"))
+                .put("1", InetAddress.getByName("11"))
+                .put("2", InetAddress.getByName("2"))
+                .put("2", InetAddress.getByName("22"))
+                .build();
+        Collection<InetAddress> result = target.chooseEndpoints(endpoints);
+        assertThat(result.size(), is(2));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("22")));
+    }
+    
+    @Test
+    public void shouldSelectHostFromLocal() throws UnknownHostException
+    {
+        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+                .put(LOCAL, InetAddress.getByName("0"))
+                .put(LOCAL, InetAddress.getByName("00"))
+                .put("1", InetAddress.getByName("1"))
+                .build();
+        Collection<InetAddress> result = target.chooseEndpoints(endpoints);
+        assertThat(result.size(), is(2));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1")));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
+    }
+    
+    @Test
+    public void shouldReturnAsIsIfNoEnoughEndpoints() throws UnknownHostException
+    {
+        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+                .put(LOCAL, InetAddress.getByName("0"))
+                .build();
+        Collection<InetAddress> result = target.chooseEndpoints(endpoints);
+        assertThat(result.size(), is(1));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
+    }
+    
+    @Test
+    public void shouldSelectTwoFirstHostsFromSingleOtherRack() throws UnknownHostException
+    {
+        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+                .put(LOCAL, InetAddress.getByName("0"))
+                .put(LOCAL, InetAddress.getByName("00"))
+                .put("1", InetAddress.getByName("1"))
+                .put("1", InetAddress.getByName("11"))
+                .put("1", InetAddress.getByName("111"))
+                .build();
+        Collection<InetAddress> result = target.chooseEndpoints(endpoints);
+        assertThat(result.size(), is(2));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1")));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
+    }
+}