You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/08/20 19:44:22 UTC
[1/5] git commit: (Hadoop) fix cluster initialisation for a split
fetching
Repository: cassandra
Updated Branches:
refs/heads/trunk 753ccff52 -> 314afb8df
(Hadoop) fix cluster initialisation for a split fetching
patch by Jacek Lewandowski; reviewed by Alex Liu for CASSANDRA-7774
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fe39eb7a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fe39eb7a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fe39eb7a
Branch: refs/heads/trunk
Commit: fe39eb7a9e2b017e3cd31b1c09693c8d565dee18
Parents: 44cfd95
Author: Jacek Lewandowski <le...@gmail.com>
Authored: Wed Aug 20 20:39:12 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Aug 20 20:39:12 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 89 +--------
.../cassandra/hadoop/cql3/CqlRecordReader.java | 19 +-
...mitedLocalNodeFirstLocalBalancingPolicy.java | 185 +++++++++++++++++++
4 files changed, 198 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe39eb7a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 17c0671..71cfca0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.10
+ * (Hadoop) fix cluster initialisation for a split fetching (CASSANDRA-7774)
* Configure system.paxos with LeveledCompactionStrategy (CASSANDRA-7753)
* Fix ALTER clustering column type from DateType to TimestampType when
using DESC clustering order (CASSANRDA-7797)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe39eb7a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index e894996..137bddf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -288,16 +288,22 @@ public class CqlConfigHelper
public static Cluster getInputCluster(String host, Configuration conf)
{
+ // this method has been left for backward compatibility
+ return getInputCluster(new String[] {host}, conf);
+ }
+
+ public static Cluster getInputCluster(String[] hosts, Configuration conf)
+ {
int port = getInputNativePort(conf);
Optional<AuthProvider> authProvider = getAuthProvider(conf);
Optional<SSLOptions> sslOptions = getSSLOptions(conf);
- LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, host);
+ LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, hosts);
SocketOptions socketOptions = getReadSocketOptions(conf);
QueryOptions queryOptions = getReadQueryOptions(conf);
PoolingOptions poolingOptions = getReadPoolingOptions(conf);
Cluster.Builder builder = Cluster.builder()
- .addContactPoint(host)
+ .addContactPoints(hosts)
.withPort(port)
.withCompression(ProtocolOptions.Compression.NONE);
@@ -480,84 +486,9 @@ public class CqlConfigHelper
return socketOptions;
}
- private static LoadBalancingPolicy getReadLoadBalancingPolicy(Configuration conf, final String stickHost)
+ private static LoadBalancingPolicy getReadLoadBalancingPolicy(Configuration conf, final String[] stickHosts)
{
- return new LoadBalancingPolicy()
- {
- private Host origHost;
- private Set<Host> liveRemoteHosts = Sets.newHashSet();
-
- @Override
- public void onAdd(Host host)
- {
- if (host.getAddress().getHostName().equals(stickHost))
- origHost = host;
- }
-
- @Override
- public void onDown(Host host)
- {
- if (host.getAddress().getHostName().equals(stickHost))
- origHost = null;
- liveRemoteHosts.remove(host);
- }
-
- @Override
- public void onRemove(Host host)
- {
- if (host.getAddress().getHostName().equals(stickHost))
- origHost = null;
- liveRemoteHosts.remove(host);
- }
-
- @Override
- public void onUp(Host host)
- {
- if (host.getAddress().getHostName().equals(stickHost))
- origHost = host;
- liveRemoteHosts.add(host);
- }
-
- @Override
- public void onSuspected(Host host)
- {
- }
-
- @Override
- public HostDistance distance(Host host)
- {
- if (host.getAddress().getHostName().equals(stickHost))
- return HostDistance.LOCAL;
- else
- return HostDistance.REMOTE;
- }
-
- @Override
- public void init(Cluster cluster, Collection<Host> hosts)
- {
- for (Host host : hosts)
- {
- if (host.getAddress().getHostName().equals(stickHost))
- {
- origHost = host;
- break;
- }
- }
- }
-
- @Override
- public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement)
- {
- if (origHost != null)
- {
- return Iterators.concat(Collections.singletonList(origHost).iterator(), liveRemoteHosts.iterator());
- }
- else
- {
- return liveRemoteHosts.iterator();
- }
- }
- };
+ return new LimitedLocalNodeFirstLocalBalancingPolicy(stickHosts);
}
private static Optional<AuthProvider> getAuthProvider(Configuration conf)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe39eb7a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 9167ac3..3eab7c0 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -124,24 +124,9 @@ public class CqlRecordReader extends RecordReader<Long, Row>
if (cluster != null)
return;
- // create connection using thrift
+ // create a Cluster instance
String[] locations = split.getLocations();
- Exception lastException = null;
- for (String location : locations)
- {
- try
- {
- cluster = CqlConfigHelper.getInputCluster(location, conf);
- break;
- }
- catch (Exception e)
- {
- lastException = e;
- logger.warn("Failed to create authenticated client to {}", location);
- }
- }
- if (cluster == null && lastException != null)
- throw lastException;
+ cluster = CqlConfigHelper.getInputCluster(locations, conf);
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe39eb7a/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java b/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java
new file mode 100644
index 0000000..3aa7df0
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java
@@ -0,0 +1,185 @@
+package org.apache.cassandra.hadoop.cql3;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * This load balancing policy is intended to be used only for CqlRecordReader when it fetches a particular split.
+ * <p/>
+ * It chooses alive hosts only from the set of the given replicas - because the connection is used to load the data from
+ * the particular split, with a strictly defined list of the replicas, it is pointless to try the other nodes.
+ * The policy tracks which of the replicas are alive, and when a new query plan is requested, it returns those replicas
+ * in the following order:
+ * <ul>
+ * <li>the local node</li>
+ * <li>the collection of the remaining hosts (which is shuffled on each request)</li>
+ * </ul>
+ */
+class LimitedLocalNodeFirstLocalBalancingPolicy implements LoadBalancingPolicy
+{
+ private final static Logger logger = LoggerFactory.getLogger(LimitedLocalNodeFirstLocalBalancingPolicy.class);
+
+ private final static Set<InetAddress> localAddresses = Collections.unmodifiableSet(getLocalInetAddresses());
+
+ private final CopyOnWriteArraySet<Host> liveReplicaHosts = new CopyOnWriteArraySet<>();
+
+ private final Set<InetAddress> replicaAddresses = new HashSet<>();
+
+ public LimitedLocalNodeFirstLocalBalancingPolicy(String[] replicas)
+ {
+ for (String replica : replicas)
+ {
+ try
+ {
+ InetAddress[] addresses = InetAddress.getAllByName(replica);
+ Collections.addAll(replicaAddresses, addresses);
+ }
+ catch (UnknownHostException e)
+ {
+ logger.warn("Invalid replica host name: {}, skipping it", replica);
+ }
+ }
+ logger.debug("Created instance with the following replicas: {}", Arrays.asList(replicas));
+ }
+
+ @Override
+ public void init(Cluster cluster, Collection<Host> hosts)
+ {
+ List<Host> replicaHosts = new ArrayList<>();
+ for (Host host : hosts)
+ {
+ if (replicaAddresses.contains(host.getAddress()))
+ {
+ replicaHosts.add(host);
+ }
+ }
+ liveReplicaHosts.addAll(replicaHosts);
+ logger.debug("Initialized with replica hosts: {}", replicaHosts);
+ }
+
+ @Override
+ public HostDistance distance(Host host)
+ {
+ if (isLocalHost(host))
+ {
+ return HostDistance.LOCAL;
+ }
+ else
+ {
+ return HostDistance.REMOTE;
+ }
+ }
+
+ @Override
+ public Iterator<Host> newQueryPlan(String keyspace, Statement statement)
+ {
+ List<Host> local = new ArrayList<>(1);
+ List<Host> remote = new ArrayList<>(liveReplicaHosts.size());
+ for (Host liveReplicaHost : liveReplicaHosts)
+ {
+ if (isLocalHost(liveReplicaHost))
+ {
+ local.add(liveReplicaHost);
+ }
+ else
+ {
+ remote.add(liveReplicaHost);
+ }
+ }
+
+ Collections.shuffle(remote);
+
+ logger.debug("Using the following hosts order for the new query plan: {} | {}", local, remote);
+
+ return Iterators.concat(local.iterator(), remote.iterator());
+ }
+
+ @Override
+ public void onAdd(Host host)
+ {
+ if (replicaAddresses.contains(host.getAddress()))
+ {
+ liveReplicaHosts.add(host);
+ logger.debug("Added a new host {}", host);
+ }
+ }
+
+ @Override
+ public void onUp(Host host)
+ {
+ if (replicaAddresses.contains(host.getAddress()))
+ {
+ liveReplicaHosts.add(host);
+ logger.debug("The host {} is now up", host);
+ }
+ }
+
+ @Override
+ public void onDown(Host host)
+ {
+ if (liveReplicaHosts.remove(host))
+ {
+ logger.debug("The host {} is now down", host);
+ }
+ }
+
+
+ @Override
+ public void onRemove(Host host)
+ {
+ if (liveReplicaHosts.remove(host))
+ {
+ logger.debug("Removed the host {}", host);
+ }
+ }
+
+ @Override
+ public void onSuspected(Host host)
+ {
+ // not supported by this load balancing policy
+ }
+
+ private static boolean isLocalHost(Host host)
+ {
+ InetAddress hostAddress = host.getAddress();
+ return hostAddress.isLoopbackAddress() || localAddresses.contains(hostAddress);
+ }
+
+ private static Set<InetAddress> getLocalInetAddresses()
+ {
+ try
+ {
+ return Sets.newHashSet(Iterators.concat(
+ Iterators.transform(
+ Iterators.forEnumeration(NetworkInterface.getNetworkInterfaces()),
+ new Function<NetworkInterface, Iterator<InetAddress>>()
+ {
+ @Override
+ public Iterator<InetAddress> apply(NetworkInterface netIface)
+ {
+ return Iterators.forEnumeration(netIface.getInetAddresses());
+ }
+ })));
+ }
+ catch (SocketException e)
+ {
+ logger.warn("Could not retrieve local network interfaces.", e);
+ return Collections.emptySet();
+ }
+ }
+}
[5/5] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/314afb8d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/314afb8d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/314afb8d
Branch: refs/heads/trunk
Commit: 314afb8dfaa95bcfc5d3861ca0fe987b78402278
Parents: 753ccff 6e05125
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Aug 20 20:44:12 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Aug 20 20:44:12 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 89 +--------
.../cassandra/hadoop/cql3/CqlRecordReader.java | 19 +-
...mitedLocalNodeFirstLocalBalancingPolicy.java | 185 +++++++++++++++++++
4 files changed, 198 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/314afb8d/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/314afb8d/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
[2/5] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1
Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ce747d73
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ce747d73
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ce747d73
Branch: refs/heads/trunk
Commit: ce747d736a2dcd40971d7237d17d3ebb32abeb63
Parents: ab45a78 fe39eb7
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Aug 20 20:42:26 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Aug 20 20:42:26 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 89 +--------
.../cassandra/hadoop/cql3/CqlRecordReader.java | 19 +-
...mitedLocalNodeFirstLocalBalancingPolicy.java | 185 +++++++++++++++++++
4 files changed, 198 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce747d73/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 8866cf6,71cfca0..5103d83
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,45 -1,5 +1,46 @@@
-2.0.10
+2.1.1
+ * (cqlsh) COPY TO/FROM improvements (CASSANDRA-7405)
+ * Support list index operations with conditions (CASSANDRA-7499)
+ * Add max live/tombstoned cells to nodetool cfstats output (CASSANDRA-7731)
+ * Validate IPv6 wildcard addresses properly (CASSANDRA-7680)
+ * (cqlsh) Error when tracing query (CASSANDRA-7613)
+ * Avoid IOOBE when building SyntaxError message snippet (CASSANDRA-7569)
+ * SSTableExport uses correct validator to create string representation of partition
+ keys (CASSANDRA-7498)
+ * Avoid NPEs when receiving type changes for an unknown keyspace (CASSANDRA-7689)
+ * Add support for custom 2i validation (CASSANDRA-7575)
+ * Pig support for hadoop CqlInputFormat (CASSANDRA-6454)
+ * Add listen_interface and rpc_interface options (CASSANDRA-7417)
+ * Improve schema merge performance (CASSANDRA-7444)
+ * Adjust MT depth based on # of partition validating (CASSANDRA-5263)
+ * Optimise NativeCell comparisons (CASSANDRA-6755)
+ * Configurable client timeout for cqlsh (CASSANDRA-7516)
+ * Include snippet of CQL query near syntax error in messages (CASSANDRA-7111)
+Merged from 2.0:
+ * (Hadoop) fix cluster initialisation for a split fetching (CASSANDRA-7774)
+ * Throw InvalidRequestException when queries contain relations on entire
+ collection columns (CASSANDRA-7506)
+ * (cqlsh) enable CTRL-R history search with libedit (CASSANDRA-7577)
+ * (Hadoop) allow ACFRW to limit nodes to local DC (CASSANDRA-7252)
+ * (cqlsh) cqlsh should automatically disable tracing when selecting
+ from system_traces (CASSANDRA-7641)
+ * (Hadoop) Add CqlOutputFormat (CASSANDRA-6927)
+ * Don't depend on cassandra config for nodetool ring (CASSANDRA-7508)
+ * (cqlsh) Fix failing cqlsh formatting tests (CASSANDRA-7703)
+ * Fix IncompatibleClassChangeError from hadoop2 (CASSANDRA-7229)
+ * Add 'nodetool sethintedhandoffthrottlekb' (CASSANDRA-7635)
+ * (cqlsh) Add tab-completion for CREATE/DROP USER IF [NOT] EXISTS (CASSANDRA-7611)
+ * Catch errors when the JVM pulls the rug out from GCInspector (CASSANDRA-5345)
+ * cqlsh fails when version number parts are not int (CASSANDRA-7524)
+Merged from 1.2:
+ * Improve PasswordAuthenticator default super user setup (CASSANDRA-7788)
+
+
+2.1.0
+ * (cqlsh) Fix COPY FROM handling of null/empty primary key
+ values (CASSANDRA-7792)
+ * Fix ordering of static cells (CASSANDRA-7763)
+Merged from 2.0:
* Configure system.paxos with LeveledCompactionStrategy (CASSANDRA-7753)
* Fix ALTER clustering column type from DateType to TimestampType when
using DESC clustering order (CASSANRDA-7797)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce747d73/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
[3/5] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1.0
Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/58554dec
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/58554dec
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/58554dec
Branch: refs/heads/trunk
Commit: 58554decc22c6754cc6ce4492ada65b5d8fbfcf9
Parents: da45786 fe39eb7
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Aug 20 20:42:48 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Aug 20 20:42:48 2014 +0300
----------------------------------------------------------------------
----------------------------------------------------------------------
[4/5] git commit: Merge branch 'cassandra-2.1.0' into cassandra-2.1
Posted by al...@apache.org.
Merge branch 'cassandra-2.1.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6e051257
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6e051257
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6e051257
Branch: refs/heads/trunk
Commit: 6e0512576e5e920690ed1df1c920126dcb939010
Parents: ce747d7 58554de
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Aug 20 20:43:23 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Aug 20 20:43:23 2014 +0300
----------------------------------------------------------------------
----------------------------------------------------------------------