You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/08/20 19:44:22 UTC

[1/5] git commit: (Hadoop) fix cluster initialisation for a split fetching

Repository: cassandra
Updated Branches:
  refs/heads/trunk 753ccff52 -> 314afb8df


(Hadoop) fix cluster initialisation for a split fetching

patch by Jacek Lewandowski; reviewed by Alex Liu for CASSANDRA-7774


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fe39eb7a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fe39eb7a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fe39eb7a

Branch: refs/heads/trunk
Commit: fe39eb7a9e2b017e3cd31b1c09693c8d565dee18
Parents: 44cfd95
Author: Jacek Lewandowski <le...@gmail.com>
Authored: Wed Aug 20 20:39:12 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Aug 20 20:39:12 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |  89 +--------
 .../cassandra/hadoop/cql3/CqlRecordReader.java  |  19 +-
 ...mitedLocalNodeFirstLocalBalancingPolicy.java | 185 +++++++++++++++++++
 4 files changed, 198 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe39eb7a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 17c0671..71cfca0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.10
+ * (Hadoop) fix cluster initialisation for a split fetching (CASSANDRA-7774)
  * Configure system.paxos with LeveledCompactionStrategy (CASSANDRA-7753)
  * Fix ALTER clustering column type from DateType to TimestampType when
    using DESC clustering order (CASSANRDA-7797)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe39eb7a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index e894996..137bddf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -288,16 +288,22 @@ public class CqlConfigHelper
 
     public static Cluster getInputCluster(String host, Configuration conf)
     {
+        // this method has been left for backward compatibility
+        return getInputCluster(new String[] {host}, conf);
+    }
+
+    public static Cluster getInputCluster(String[] hosts, Configuration conf)
+    {
         int port = getInputNativePort(conf);
         Optional<AuthProvider> authProvider = getAuthProvider(conf);
         Optional<SSLOptions> sslOptions = getSSLOptions(conf);
-        LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, host);
+        LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, hosts);
         SocketOptions socketOptions = getReadSocketOptions(conf);
         QueryOptions queryOptions = getReadQueryOptions(conf);
         PoolingOptions poolingOptions = getReadPoolingOptions(conf);
         
         Cluster.Builder builder = Cluster.builder()
-                                         .addContactPoint(host)
+                                         .addContactPoints(hosts)
                                          .withPort(port)
                                          .withCompression(ProtocolOptions.Compression.NONE);
 
@@ -480,84 +486,9 @@ public class CqlConfigHelper
         return socketOptions;
     }
 
-    private static LoadBalancingPolicy getReadLoadBalancingPolicy(Configuration conf, final String stickHost)
+    private static LoadBalancingPolicy getReadLoadBalancingPolicy(Configuration conf, final String[] stickHosts)
     {
-        return new LoadBalancingPolicy()
-        {
-            private Host origHost;
-            private Set<Host> liveRemoteHosts = Sets.newHashSet();
-
-            @Override
-            public void onAdd(Host host)
-            {
-                if (host.getAddress().getHostName().equals(stickHost))
-                    origHost = host;
-            }
-
-            @Override
-            public void onDown(Host host)
-            {
-                if (host.getAddress().getHostName().equals(stickHost))
-                    origHost = null;
-                liveRemoteHosts.remove(host);
-            }
-
-            @Override
-            public void onRemove(Host host)
-            {
-                if (host.getAddress().getHostName().equals(stickHost))
-                    origHost = null;
-                liveRemoteHosts.remove(host);
-            }
-
-            @Override
-            public void onUp(Host host)
-            {
-                if (host.getAddress().getHostName().equals(stickHost))
-                    origHost = host;
-                liveRemoteHosts.add(host);
-            }
-
-            @Override
-            public void onSuspected(Host host)
-            {
-            }
-
-            @Override
-            public HostDistance distance(Host host)
-            {
-                if (host.getAddress().getHostName().equals(stickHost))
-                    return HostDistance.LOCAL;
-                else
-                    return HostDistance.REMOTE;
-            }
-
-            @Override
-            public void init(Cluster cluster, Collection<Host> hosts)
-            {
-                for (Host host : hosts)
-                {
-                    if (host.getAddress().getHostName().equals(stickHost))
-                    {
-                        origHost = host;
-                        break;
-                    }
-                }
-            }
-
-            @Override
-            public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement)
-            {
-                if (origHost != null)
-                {
-                    return Iterators.concat(Collections.singletonList(origHost).iterator(), liveRemoteHosts.iterator());
-                }
-                else
-                {
-                    return liveRemoteHosts.iterator();
-                }
-            }
-        };
+        return new LimitedLocalNodeFirstLocalBalancingPolicy(stickHosts);
     }
 
     private static Optional<AuthProvider> getAuthProvider(Configuration conf)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe39eb7a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 9167ac3..3eab7c0 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -124,24 +124,9 @@ public class CqlRecordReader extends RecordReader<Long, Row>
             if (cluster != null)
                 return;
 
-            // create connection using thrift
+            // create a Cluster instance
             String[] locations = split.getLocations();
-            Exception lastException = null;
-            for (String location : locations)
-            {
-                try
-                {
-                    cluster = CqlConfigHelper.getInputCluster(location, conf);
-                    break;
-                }
-                catch (Exception e)
-                {
-                    lastException = e;
-                    logger.warn("Failed to create authenticated client to {}", location);
-                }
-            }
-            if (cluster == null && lastException != null)
-                throw lastException;
+            cluster = CqlConfigHelper.getInputCluster(locations, conf);
         }
         catch (Exception e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe39eb7a/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java b/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java
new file mode 100644
index 0000000..3aa7df0
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java
@@ -0,0 +1,185 @@
+package org.apache.cassandra.hadoop.cql3;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * This load balancing policy is intended to be used only for CqlRecordReader when it fetches a particular split.
+ * <p/>
+ * It chooses alive hosts only from the set of the given replicas - because the connection is used to load the data from
+ * the particular split, with a strictly defined list of the replicas, it is pointless to try the other nodes.
+ * The policy tracks which of the replicas are alive, and when a new query plan is requested, it returns those replicas
+ * in the following order:
+ * <ul>
+ * <li>the local node</li>
+ * <li>the collection of the remaining hosts (which is shuffled on each request)</li>
+ * </ul>
+ */
+class LimitedLocalNodeFirstLocalBalancingPolicy implements LoadBalancingPolicy
+{
+    private final static Logger logger = LoggerFactory.getLogger(LimitedLocalNodeFirstLocalBalancingPolicy.class);
+
+    private final static Set<InetAddress> localAddresses = Collections.unmodifiableSet(getLocalInetAddresses());
+
+    private final CopyOnWriteArraySet<Host> liveReplicaHosts = new CopyOnWriteArraySet<>();
+
+    private final Set<InetAddress> replicaAddresses = new HashSet<>();
+
+    public LimitedLocalNodeFirstLocalBalancingPolicy(String[] replicas)
+    {
+        for (String replica : replicas)
+        {
+            try
+            {
+                InetAddress[] addresses = InetAddress.getAllByName(replica);
+                Collections.addAll(replicaAddresses, addresses);
+            }
+            catch (UnknownHostException e)
+            {
+                logger.warn("Invalid replica host name: {}, skipping it", replica);
+            }
+        }
+        logger.debug("Created instance with the following replicas: {}", Arrays.asList(replicas));
+    }
+
+    @Override
+    public void init(Cluster cluster, Collection<Host> hosts)
+    {
+        List<Host> replicaHosts = new ArrayList<>();
+        for (Host host : hosts)
+        {
+            if (replicaAddresses.contains(host.getAddress()))
+            {
+                replicaHosts.add(host);
+            }
+        }
+        liveReplicaHosts.addAll(replicaHosts);
+        logger.debug("Initialized with replica hosts: {}", replicaHosts);
+    }
+
+    @Override
+    public HostDistance distance(Host host)
+    {
+        if (isLocalHost(host))
+        {
+            return HostDistance.LOCAL;
+        }
+        else
+        {
+            return HostDistance.REMOTE;
+        }
+    }
+
+    @Override
+    public Iterator<Host> newQueryPlan(String keyspace, Statement statement)
+    {
+        List<Host> local = new ArrayList<>(1);
+        List<Host> remote = new ArrayList<>(liveReplicaHosts.size());
+        for (Host liveReplicaHost : liveReplicaHosts)
+        {
+            if (isLocalHost(liveReplicaHost))
+            {
+                local.add(liveReplicaHost);
+            }
+            else
+            {
+                remote.add(liveReplicaHost);
+            }
+        }
+
+        Collections.shuffle(remote);
+
+        logger.debug("Using the following hosts order for the new query plan: {} | {}", local, remote);
+
+        return Iterators.concat(local.iterator(), remote.iterator());
+    }
+
+    @Override
+    public void onAdd(Host host)
+    {
+        if (replicaAddresses.contains(host.getAddress()))
+        {
+            liveReplicaHosts.add(host);
+            logger.debug("Added a new host {}", host);
+        }
+    }
+
+    @Override
+    public void onUp(Host host)
+    {
+        if (replicaAddresses.contains(host.getAddress()))
+        {
+            liveReplicaHosts.add(host);
+            logger.debug("The host {} is now up", host);
+        }
+    }
+
+    @Override
+    public void onDown(Host host)
+    {
+        if (liveReplicaHosts.remove(host))
+        {
+            logger.debug("The host {} is now down", host);
+        }
+    }
+
+
+    @Override
+    public void onRemove(Host host)
+    {
+        if (liveReplicaHosts.remove(host))
+        {
+            logger.debug("Removed the host {}", host);
+        }
+    }
+
+    @Override
+    public void onSuspected(Host host)
+    {
+        // not supported by this load balancing policy
+    }
+
+    private static boolean isLocalHost(Host host)
+    {
+        InetAddress hostAddress = host.getAddress();
+        return hostAddress.isLoopbackAddress() || localAddresses.contains(hostAddress);
+    }
+
+    private static Set<InetAddress> getLocalInetAddresses()
+    {
+        try
+        {
+            return Sets.newHashSet(Iterators.concat(
+                    Iterators.transform(
+                            Iterators.forEnumeration(NetworkInterface.getNetworkInterfaces()),
+                            new Function<NetworkInterface, Iterator<InetAddress>>()
+                            {
+                                @Override
+                                public Iterator<InetAddress> apply(NetworkInterface netIface)
+                                {
+                                    return Iterators.forEnumeration(netIface.getInetAddresses());
+                                }
+                            })));
+        }
+        catch (SocketException e)
+        {
+            logger.warn("Could not retrieve local network interfaces.", e);
+            return Collections.emptySet();
+        }
+    }
+}


[5/5] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/314afb8d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/314afb8d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/314afb8d

Branch: refs/heads/trunk
Commit: 314afb8dfaa95bcfc5d3861ca0fe987b78402278
Parents: 753ccff 6e05125
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Aug 20 20:44:12 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Aug 20 20:44:12 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |  89 +--------
 .../cassandra/hadoop/cql3/CqlRecordReader.java  |  19 +-
 ...mitedLocalNodeFirstLocalBalancingPolicy.java | 185 +++++++++++++++++++
 4 files changed, 198 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/314afb8d/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/314afb8d/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------


[2/5] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ce747d73
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ce747d73
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ce747d73

Branch: refs/heads/trunk
Commit: ce747d736a2dcd40971d7237d17d3ebb32abeb63
Parents: ab45a78 fe39eb7
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Aug 20 20:42:26 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Aug 20 20:42:26 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |  89 +--------
 .../cassandra/hadoop/cql3/CqlRecordReader.java  |  19 +-
 ...mitedLocalNodeFirstLocalBalancingPolicy.java | 185 +++++++++++++++++++
 4 files changed, 198 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce747d73/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 8866cf6,71cfca0..5103d83
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,45 -1,5 +1,46 @@@
 -2.0.10
 +2.1.1
 + * (cqlsh) COPY TO/FROM improvements (CASSANDRA-7405)
 + * Support list index operations with conditions (CASSANDRA-7499)
 + * Add max live/tombstoned cells to nodetool cfstats output (CASSANDRA-7731)
 + * Validate IPv6 wildcard addresses properly (CASSANDRA-7680)
 + * (cqlsh) Error when tracing query (CASSANDRA-7613)
 + * Avoid IOOBE when building SyntaxError message snippet (CASSANDRA-7569)
 + * SSTableExport uses correct validator to create string representation of partition
 +   keys (CASSANDRA-7498)
 + * Avoid NPEs when receiving type changes for an unknown keyspace (CASSANDRA-7689)
 + * Add support for custom 2i validation (CASSANDRA-7575)
 + * Pig support for hadoop CqlInputFormat (CASSANDRA-6454)
 + * Add listen_interface and rpc_interface options (CASSANDRA-7417)
 + * Improve schema merge performance (CASSANDRA-7444)
 + * Adjust MT depth based on # of partition validating (CASSANDRA-5263)
 + * Optimise NativeCell comparisons (CASSANDRA-6755)
 + * Configurable client timeout for cqlsh (CASSANDRA-7516)
 + * Include snippet of CQL query near syntax error in messages (CASSANDRA-7111)
 +Merged from 2.0:
+  * (Hadoop) fix cluster initialisation for a split fetching (CASSANDRA-7774)
 + * Throw InvalidRequestException when queries contain relations on entire
 +   collection columns (CASSANDRA-7506)
 + * (cqlsh) enable CTRL-R history search with libedit (CASSANDRA-7577)
 + * (Hadoop) allow ACFRW to limit nodes to local DC (CASSANDRA-7252)
 + * (cqlsh) cqlsh should automatically disable tracing when selecting
 +   from system_traces (CASSANDRA-7641)
 + * (Hadoop) Add CqlOutputFormat (CASSANDRA-6927)
 + * Don't depend on cassandra config for nodetool ring (CASSANDRA-7508)
 + * (cqlsh) Fix failing cqlsh formatting tests (CASSANDRA-7703)
 + * Fix IncompatibleClassChangeError from hadoop2 (CASSANDRA-7229)
 + * Add 'nodetool sethintedhandoffthrottlekb' (CASSANDRA-7635)
 + * (cqlsh) Add tab-completion for CREATE/DROP USER IF [NOT] EXISTS (CASSANDRA-7611)
 + * Catch errors when the JVM pulls the rug out from GCInspector (CASSANDRA-5345)
 + * cqlsh fails when version number parts are not int (CASSANDRA-7524)
 +Merged from 1.2:
 + * Improve PasswordAuthenticator default super user setup (CASSANDRA-7788)
 +
 +
 +2.1.0
 + * (cqlsh) Fix COPY FROM handling of null/empty primary key
 +   values (CASSANDRA-7792)
 + * Fix ordering of static cells (CASSANDRA-7763)
 +Merged from 2.0:
   * Configure system.paxos with LeveledCompactionStrategy (CASSANDRA-7753)
   * Fix ALTER clustering column type from DateType to TimestampType when
     using DESC clustering order (CASSANRDA-7797)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce747d73/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------


[3/5] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1.0

Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/58554dec
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/58554dec
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/58554dec

Branch: refs/heads/trunk
Commit: 58554decc22c6754cc6ce4492ada65b5d8fbfcf9
Parents: da45786 fe39eb7
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Aug 20 20:42:48 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Aug 20 20:42:48 2014 +0300

----------------------------------------------------------------------

----------------------------------------------------------------------



[4/5] git commit: Merge branch 'cassandra-2.1.0' into cassandra-2.1

Posted by al...@apache.org.
Merge branch 'cassandra-2.1.0' into cassandra-2.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6e051257
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6e051257
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6e051257

Branch: refs/heads/trunk
Commit: 6e0512576e5e920690ed1df1c920126dcb939010
Parents: ce747d7 58554de
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Aug 20 20:43:23 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Aug 20 20:43:23 2014 +0300

----------------------------------------------------------------------

----------------------------------------------------------------------