You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/05/04 02:34:59 UTC
[1/2] git commit: Move BatchlogEndpointSelector to
BatchlogManager.EndpointFilter + minor refactorings
Repository: cassandra
Updated Branches:
refs/heads/trunk aca945d79 -> f7df2fec8
Move BatchlogEndpointSelector to BatchlogManager.EndpointFilter + minor refactorings
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f486193a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f486193a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f486193a
Branch: refs/heads/trunk
Commit: f486193ab5654881986a7f7b655fee3aedabad15
Parents: f12a8a3
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun May 4 03:33:03 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun May 4 03:33:03 2014 +0300
----------------------------------------------------------------------
.../apache/cassandra/db/BatchlogManager.java | 80 ++++++++++++-
.../service/BatchlogEndpointSelector.java | 110 -----------------
.../apache/cassandra/service/StorageProxy.java | 7 +-
.../service/BatchlogEndpointFilterTest.java | 117 +++++++++++++++++++
.../service/BatchlogEndpointSelectorTest.java | 116 ------------------
5 files changed, 197 insertions(+), 233 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f486193a/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index fee686b..de51c8d 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -30,7 +30,7 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.*;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -364,4 +364,82 @@ public class BatchlogManager implements BatchlogManagerMBean
{
return QueryProcessor.processInternal(String.format(format, args));
}
+
+ public static class EndpointFilter
+ {
+ private final String localRack;
+ private final Multimap<String, InetAddress> endpoints;
+
+ public EndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
+ {
+ this.localRack = localRack;
+ this.endpoints = endpoints;
+ }
+
+ /**
+ * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
+ */
+ public Collection<InetAddress> filter()
+ {
+ // special case for single-node data centers
+ if (endpoints.values().size() == 1)
+ return endpoints.values();
+
+ // strip out dead endpoints and localhost
+ ListMultimap<String, InetAddress> validated = ArrayListMultimap.create();
+ for (Map.Entry<String, InetAddress> entry : endpoints.entries())
+ if (isValid(entry.getValue()))
+ validated.put(entry.getKey(), entry.getValue());
+
+ if (validated.size() <= 2)
+ return validated.values();
+
+ if (validated.size() - validated.get(localRack).size() >= 2)
+ {
+ // we have enough endpoints in other racks
+ validated.removeAll(localRack);
+ }
+
+ if (validated.keySet().size() == 1)
+ {
+ // we have only 1 `other` rack
+ Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values());
+ return Lists.newArrayList(Iterables.limit(otherRack, 2));
+ }
+
+ // randomize which racks we pick from if more than 2 remaining
+ Collection<String> racks;
+ if (validated.keySet().size() == 2)
+ {
+ racks = validated.keySet();
+ }
+ else
+ {
+ racks = Lists.newArrayList(validated.keySet());
+ Collections.shuffle((List) racks);
+ }
+
+ // grab a random member of up to two racks
+ List<InetAddress> result = new ArrayList<>(2);
+ for (String rack : Iterables.limit(racks, 2))
+ {
+ List<InetAddress> rackMembers = validated.get(rack);
+ result.add(rackMembers.get(getRandomInt(rackMembers.size())));
+ }
+
+ return result;
+ }
+
+ @VisibleForTesting
+ protected boolean isValid(InetAddress input)
+ {
+ return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input);
+ }
+
+ @VisibleForTesting
+ protected int getRandomInt(int bound)
+ {
+ return FBUtilities.threadLocalRandom().nextInt(bound);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f486193a/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java b/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java
deleted file mode 100644
index bf032f5..0000000
--- a/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.utils.FBUtilities;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-
-public class BatchlogEndpointSelector
-{
- private final String localRack;
-
- public BatchlogEndpointSelector(String localRack)
- {
- this.localRack = localRack;
- }
-
- /**
- * @param endpoints nodes in the local datacenter, grouped by rack name
- * @return list of candidates for batchlog hosting. if possible these will be two nodes from different racks.
- */
- public Collection<InetAddress> chooseEndpoints(Multimap<String, InetAddress> endpoints)
- {
- // strip out dead endpoints and localhost
- ListMultimap<String, InetAddress> validated = ArrayListMultimap.create();
- for (Map.Entry<String, InetAddress> entry : endpoints.entries())
- {
- if (isValid(entry.getValue()))
- validated.put(entry.getKey(), entry.getValue());
- }
- if (validated.size() <= 2)
- return validated.values();
-
- if ((validated.size() - validated.get(localRack).size()) >= 2)
- {
- // we have enough endpoints in other racks
- validated.removeAll(localRack);
- }
-
- if (validated.keySet().size() == 1)
- {
- // we have only 1 `other` rack
- Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values());
- return Lists.newArrayList(Iterables.limit(otherRack, 2));
- }
-
- // randomize which racks we pick from if more than 2 remaining
- Collection<String> racks;
- if (validated.keySet().size() == 2)
- {
- racks = validated.keySet();
- }
- else
- {
- racks = Lists.newArrayList(validated.keySet());
- Collections.shuffle((List) racks);
- }
-
- // grab a random member of up to two racks
- List<InetAddress> result = new ArrayList<>(2);
- for (String rack : Iterables.limit(racks, 2))
- {
- List<InetAddress> rackMembers = validated.get(rack);
- result.add(rackMembers.get(getRandomInt(rackMembers.size())));
- }
-
- return result;
- }
-
- @VisibleForTesting
- protected boolean isValid(InetAddress input)
- {
- return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input);
- }
-
- @VisibleForTesting
- protected int getRandomInt(int bound)
- {
- return FBUtilities.threadLocalRandom().nextInt(bound);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f486193a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 04c3641..d01dd99 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -741,14 +741,9 @@ public class StorageProxy implements StorageProxyMBean
{
TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
Multimap<String, InetAddress> localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
-
- // special case for single-node datacenters
- if (localEndpoints.size() == 1)
- return localEndpoints.values();
-
String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddress());
- Collection<InetAddress> chosenEndpoints = new BatchlogEndpointSelector(localRack).chooseEndpoints(localEndpoints);
+ Collection<InetAddress> chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter();
if (chosenEndpoints.isEmpty())
{
if (consistencyLevel == ConsistencyLevel.ANY)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f486193a/test/unit/org/apache/cassandra/service/BatchlogEndpointFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/BatchlogEndpointFilterTest.java b/test/unit/org/apache/cassandra/service/BatchlogEndpointFilterTest.java
new file mode 100644
index 0000000..72e8df5
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/BatchlogEndpointFilterTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import org.junit.Test;
+import org.junit.matchers.JUnitMatchers;
+
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+
+import org.apache.cassandra.db.BatchlogManager;
+
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.CoreMatchers.is;
+
+public class BatchlogEndpointFilterTest
+{
+ private static final String LOCAL = "local";
+
+ @Test
+ public void shouldSelect2hostsFromNonLocalRacks() throws UnknownHostException
+ {
+ Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+ .put(LOCAL, InetAddress.getByName("0"))
+ .put(LOCAL, InetAddress.getByName("00"))
+ .put("1", InetAddress.getByName("1"))
+ .put("1", InetAddress.getByName("11"))
+ .put("2", InetAddress.getByName("2"))
+ .put("2", InetAddress.getByName("22"))
+ .build();
+ Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+ assertThat(result.size(), is(2));
+ assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
+ assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("22")));
+ }
+
+ @Test
+ public void shouldSelectHostFromLocal() throws UnknownHostException
+ {
+ Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+ .put(LOCAL, InetAddress.getByName("0"))
+ .put(LOCAL, InetAddress.getByName("00"))
+ .put("1", InetAddress.getByName("1"))
+ .build();
+ Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+ assertThat(result.size(), is(2));
+ assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1")));
+ assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
+ }
+
+ @Test
+ public void shouldReturnAsIsIfNoEnoughEndpoints() throws UnknownHostException
+ {
+ Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+ .put(LOCAL, InetAddress.getByName("0"))
+ .build();
+ Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+ assertThat(result.size(), is(1));
+ assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
+ }
+
+ @Test
+ public void shouldSelectTwoFirstHostsFromSingleOtherRack() throws UnknownHostException
+ {
+ Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+ .put(LOCAL, InetAddress.getByName("0"))
+ .put(LOCAL, InetAddress.getByName("00"))
+ .put("1", InetAddress.getByName("1"))
+ .put("1", InetAddress.getByName("11"))
+ .put("1", InetAddress.getByName("111"))
+ .build();
+ Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+ assertThat(result.size(), is(2));
+ assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1")));
+ assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
+ }
+
+ private static class TestEndpointFilter extends BatchlogManager.EndpointFilter
+ {
+ public TestEndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
+ {
+ super(localRack, endpoints);
+ }
+
+ @Override
+ protected boolean isValid(InetAddress input)
+ {
+ // We will use always alive non-localhost endpoints
+ return true;
+ }
+
+ @Override
+ protected int getRandomInt(int bound)
+ {
+ // We don't need random behavior here
+ return bound - 1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f486193a/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java b/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java
deleted file mode 100644
index 293078d..0000000
--- a/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Collection;
-
-import org.junit.Test;
-import org.junit.matchers.JUnitMatchers;
-
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.Multimap;
-
-public class BatchlogEndpointSelectorTest
-{
- private final BatchlogEndpointSelector target;
- private static final String LOCAL = "local";
-
-
- public BatchlogEndpointSelectorTest() throws UnknownHostException
- {
- target = new BatchlogEndpointSelector(LOCAL)
- {
- @Override
- protected boolean isValid(InetAddress input)
- {
- //we will use always alive non-localhost endpoints
- return true;
- }
-
- @Override
- protected int getRandomInt(int bound)
- {
- //we don't need a random behavior here
- return bound - 1;
- }
- };
- }
-
- @Test
- public void shouldSelect2hostsFromNonLocalRacks() throws UnknownHostException
- {
- Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
- .put(LOCAL, InetAddress.getByName("0"))
- .put(LOCAL, InetAddress.getByName("00"))
- .put("1", InetAddress.getByName("1"))
- .put("1", InetAddress.getByName("11"))
- .put("2", InetAddress.getByName("2"))
- .put("2", InetAddress.getByName("22"))
- .build();
- Collection<InetAddress> result = target.chooseEndpoints(endpoints);
- assertThat(result.size(), is(2));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("22")));
- }
-
- @Test
- public void shouldSelectHostFromLocal() throws UnknownHostException
- {
- Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
- .put(LOCAL, InetAddress.getByName("0"))
- .put(LOCAL, InetAddress.getByName("00"))
- .put("1", InetAddress.getByName("1"))
- .build();
- Collection<InetAddress> result = target.chooseEndpoints(endpoints);
- assertThat(result.size(), is(2));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1")));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
- }
-
- @Test
- public void shouldReturnAsIsIfNoEnoughEndpoints() throws UnknownHostException
- {
- Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
- .put(LOCAL, InetAddress.getByName("0"))
- .build();
- Collection<InetAddress> result = target.chooseEndpoints(endpoints);
- assertThat(result.size(), is(1));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
- }
-
- @Test
- public void shouldSelectTwoFirstHostsFromSingleOtherRack() throws UnknownHostException
- {
- Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
- .put(LOCAL, InetAddress.getByName("0"))
- .put(LOCAL, InetAddress.getByName("00"))
- .put("1", InetAddress.getByName("1"))
- .put("1", InetAddress.getByName("11"))
- .put("1", InetAddress.getByName("111"))
- .build();
- Collection<InetAddress> result = target.chooseEndpoints(endpoints);
- assertThat(result.size(), is(2));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1")));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
- }
-}
[2/2] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f7df2fec
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f7df2fec
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f7df2fec
Branch: refs/heads/trunk
Commit: f7df2fec8464a587aed963e642352454f94dd0ff
Parents: aca945d f486193
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun May 4 03:33:51 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun May 4 03:33:51 2014 +0300
----------------------------------------------------------------------
.../apache/cassandra/db/BatchlogManager.java | 80 ++++++++++++-
.../service/BatchlogEndpointSelector.java | 110 -----------------
.../apache/cassandra/service/StorageProxy.java | 7 +-
.../service/BatchlogEndpointFilterTest.java | 117 +++++++++++++++++++
.../service/BatchlogEndpointSelectorTest.java | 116 ------------------
5 files changed, 197 insertions(+), 233 deletions(-)
----------------------------------------------------------------------