You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/08/20 19:43:12 UTC
[1/2] git commit: (Hadoop) fix cluster initialisation for a split
fetching
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1.0 da4578678 -> 58554decc
(Hadoop) fix cluster initialisation for a split fetching
patch by Jacek Lewandowski; reviewed by Alex Liu for CASSANDRA-7774
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fe39eb7a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fe39eb7a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fe39eb7a
Branch: refs/heads/cassandra-2.1.0
Commit: fe39eb7a9e2b017e3cd31b1c09693c8d565dee18
Parents: 44cfd95
Author: Jacek Lewandowski <le...@gmail.com>
Authored: Wed Aug 20 20:39:12 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Aug 20 20:39:12 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 89 +--------
.../cassandra/hadoop/cql3/CqlRecordReader.java | 19 +-
...mitedLocalNodeFirstLocalBalancingPolicy.java | 185 +++++++++++++++++++
4 files changed, 198 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe39eb7a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 17c0671..71cfca0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.10
+ * (Hadoop) fix cluster initialisation for a split fetching (CASSANDRA-7774)
* Configure system.paxos with LeveledCompactionStrategy (CASSANDRA-7753)
* Fix ALTER clustering column type from DateType to TimestampType when
using DESC clustering order (CASSANRDA-7797)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe39eb7a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index e894996..137bddf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -288,16 +288,22 @@ public class CqlConfigHelper
public static Cluster getInputCluster(String host, Configuration conf)
{
+ // this method has been left for backward compatibility
+ return getInputCluster(new String[] {host}, conf);
+ }
+
+ public static Cluster getInputCluster(String[] hosts, Configuration conf)
+ {
int port = getInputNativePort(conf);
Optional<AuthProvider> authProvider = getAuthProvider(conf);
Optional<SSLOptions> sslOptions = getSSLOptions(conf);
- LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, host);
+ LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, hosts);
SocketOptions socketOptions = getReadSocketOptions(conf);
QueryOptions queryOptions = getReadQueryOptions(conf);
PoolingOptions poolingOptions = getReadPoolingOptions(conf);
Cluster.Builder builder = Cluster.builder()
- .addContactPoint(host)
+ .addContactPoints(hosts)
.withPort(port)
.withCompression(ProtocolOptions.Compression.NONE);
@@ -480,84 +486,9 @@ public class CqlConfigHelper
return socketOptions;
}
- private static LoadBalancingPolicy getReadLoadBalancingPolicy(Configuration conf, final String stickHost)
+ private static LoadBalancingPolicy getReadLoadBalancingPolicy(Configuration conf, final String[] stickHosts)
{
- return new LoadBalancingPolicy()
- {
- private Host origHost;
- private Set<Host> liveRemoteHosts = Sets.newHashSet();
-
- @Override
- public void onAdd(Host host)
- {
- if (host.getAddress().getHostName().equals(stickHost))
- origHost = host;
- }
-
- @Override
- public void onDown(Host host)
- {
- if (host.getAddress().getHostName().equals(stickHost))
- origHost = null;
- liveRemoteHosts.remove(host);
- }
-
- @Override
- public void onRemove(Host host)
- {
- if (host.getAddress().getHostName().equals(stickHost))
- origHost = null;
- liveRemoteHosts.remove(host);
- }
-
- @Override
- public void onUp(Host host)
- {
- if (host.getAddress().getHostName().equals(stickHost))
- origHost = host;
- liveRemoteHosts.add(host);
- }
-
- @Override
- public void onSuspected(Host host)
- {
- }
-
- @Override
- public HostDistance distance(Host host)
- {
- if (host.getAddress().getHostName().equals(stickHost))
- return HostDistance.LOCAL;
- else
- return HostDistance.REMOTE;
- }
-
- @Override
- public void init(Cluster cluster, Collection<Host> hosts)
- {
- for (Host host : hosts)
- {
- if (host.getAddress().getHostName().equals(stickHost))
- {
- origHost = host;
- break;
- }
- }
- }
-
- @Override
- public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement)
- {
- if (origHost != null)
- {
- return Iterators.concat(Collections.singletonList(origHost).iterator(), liveRemoteHosts.iterator());
- }
- else
- {
- return liveRemoteHosts.iterator();
- }
- }
- };
+ return new LimitedLocalNodeFirstLocalBalancingPolicy(stickHosts);
}
private static Optional<AuthProvider> getAuthProvider(Configuration conf)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe39eb7a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 9167ac3..3eab7c0 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -124,24 +124,9 @@ public class CqlRecordReader extends RecordReader<Long, Row>
if (cluster != null)
return;
- // create connection using thrift
+ // create a Cluster instance
String[] locations = split.getLocations();
- Exception lastException = null;
- for (String location : locations)
- {
- try
- {
- cluster = CqlConfigHelper.getInputCluster(location, conf);
- break;
- }
- catch (Exception e)
- {
- lastException = e;
- logger.warn("Failed to create authenticated client to {}", location);
- }
- }
- if (cluster == null && lastException != null)
- throw lastException;
+ cluster = CqlConfigHelper.getInputCluster(locations, conf);
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe39eb7a/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java b/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java
new file mode 100644
index 0000000..3aa7df0
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java
@@ -0,0 +1,185 @@
+package org.apache.cassandra.hadoop.cql3;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * This load balancing policy is intended to be used only for CqlRecordReader when it fetches a particular split.
+ * <p/>
+ * It chooses alive hosts only from the set of the given replicas - because the connection is used to load the data from
+ * the particular split, with a strictly defined list of the replicas, it is pointless to try the other nodes.
+ * The policy tracks which of the replicas are alive, and when a new query plan is requested, it returns those replicas
+ * in the following order:
+ * <ul>
+ * <li>the local node</li>
+ * <li>the collection of the remaining hosts (which is shuffled on each request)</li>
+ * </ul>
+ */
+class LimitedLocalNodeFirstLocalBalancingPolicy implements LoadBalancingPolicy
+{
+ private final static Logger logger = LoggerFactory.getLogger(LimitedLocalNodeFirstLocalBalancingPolicy.class);
+
+ private final static Set<InetAddress> localAddresses = Collections.unmodifiableSet(getLocalInetAddresses());
+
+ private final CopyOnWriteArraySet<Host> liveReplicaHosts = new CopyOnWriteArraySet<>();
+
+ private final Set<InetAddress> replicaAddresses = new HashSet<>();
+
+ public LimitedLocalNodeFirstLocalBalancingPolicy(String[] replicas)
+ {
+ for (String replica : replicas)
+ {
+ try
+ {
+ InetAddress[] addresses = InetAddress.getAllByName(replica);
+ Collections.addAll(replicaAddresses, addresses);
+ }
+ catch (UnknownHostException e)
+ {
+ logger.warn("Invalid replica host name: {}, skipping it", replica);
+ }
+ }
+ logger.debug("Created instance with the following replicas: {}", Arrays.asList(replicas));
+ }
+
+ @Override
+ public void init(Cluster cluster, Collection<Host> hosts)
+ {
+ List<Host> replicaHosts = new ArrayList<>();
+ for (Host host : hosts)
+ {
+ if (replicaAddresses.contains(host.getAddress()))
+ {
+ replicaHosts.add(host);
+ }
+ }
+ liveReplicaHosts.addAll(replicaHosts);
+ logger.debug("Initialized with replica hosts: {}", replicaHosts);
+ }
+
+ @Override
+ public HostDistance distance(Host host)
+ {
+ if (isLocalHost(host))
+ {
+ return HostDistance.LOCAL;
+ }
+ else
+ {
+ return HostDistance.REMOTE;
+ }
+ }
+
+ @Override
+ public Iterator<Host> newQueryPlan(String keyspace, Statement statement)
+ {
+ List<Host> local = new ArrayList<>(1);
+ List<Host> remote = new ArrayList<>(liveReplicaHosts.size());
+ for (Host liveReplicaHost : liveReplicaHosts)
+ {
+ if (isLocalHost(liveReplicaHost))
+ {
+ local.add(liveReplicaHost);
+ }
+ else
+ {
+ remote.add(liveReplicaHost);
+ }
+ }
+
+ Collections.shuffle(remote);
+
+ logger.debug("Using the following hosts order for the new query plan: {} | {}", local, remote);
+
+ return Iterators.concat(local.iterator(), remote.iterator());
+ }
+
+ @Override
+ public void onAdd(Host host)
+ {
+ if (replicaAddresses.contains(host.getAddress()))
+ {
+ liveReplicaHosts.add(host);
+ logger.debug("Added a new host {}", host);
+ }
+ }
+
+ @Override
+ public void onUp(Host host)
+ {
+ if (replicaAddresses.contains(host.getAddress()))
+ {
+ liveReplicaHosts.add(host);
+ logger.debug("The host {} is now up", host);
+ }
+ }
+
+ @Override
+ public void onDown(Host host)
+ {
+ if (liveReplicaHosts.remove(host))
+ {
+ logger.debug("The host {} is now down", host);
+ }
+ }
+
+
+ @Override
+ public void onRemove(Host host)
+ {
+ if (liveReplicaHosts.remove(host))
+ {
+ logger.debug("Removed the host {}", host);
+ }
+ }
+
+ @Override
+ public void onSuspected(Host host)
+ {
+ // not supported by this load balancing policy
+ }
+
+ private static boolean isLocalHost(Host host)
+ {
+ InetAddress hostAddress = host.getAddress();
+ return hostAddress.isLoopbackAddress() || localAddresses.contains(hostAddress);
+ }
+
+ private static Set<InetAddress> getLocalInetAddresses()
+ {
+ try
+ {
+ return Sets.newHashSet(Iterators.concat(
+ Iterators.transform(
+ Iterators.forEnumeration(NetworkInterface.getNetworkInterfaces()),
+ new Function<NetworkInterface, Iterator<InetAddress>>()
+ {
+ @Override
+ public Iterator<InetAddress> apply(NetworkInterface netIface)
+ {
+ return Iterators.forEnumeration(netIface.getInetAddresses());
+ }
+ })));
+ }
+ catch (SocketException e)
+ {
+ logger.warn("Could not retrieve local network interfaces.", e);
+ return Collections.emptySet();
+ }
+ }
+}
[2/2] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1.0
Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/58554dec
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/58554dec
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/58554dec
Branch: refs/heads/cassandra-2.1.0
Commit: 58554decc22c6754cc6ce4492ada65b5d8fbfcf9
Parents: da45786 fe39eb7
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Aug 20 20:42:48 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Aug 20 20:42:48 2014 +0300
----------------------------------------------------------------------
----------------------------------------------------------------------