You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/02/02 15:01:43 UTC
[05/15] cassandra git commit: test_bulk_round_trip_blogposts is
failing occasionally
test_bulk_round_trip_blogposts is failing occasionally
Patch by Stefania Alborghetti; reviewed by Paulo Motta for CASSANDRA-10938
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/165f586e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/165f586e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/165f586e
Branch: refs/heads/cassandra-3.3
Commit: 165f586e6f5e7e5d08f0b85e5b00dbe1f68e3e8f
Parents: 3b794f0
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Mon Jan 11 09:31:36 2016 +0000
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Feb 2 14:51:19 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
pylib/cqlshlib/copyutil.py | 87 +++++++++++++-------
.../cassandra/transport/ServerConnection.java | 5 +-
3 files changed, 58 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/165f586e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3e53d8..1793c32 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.13
+ * test_bulk_round_trip_blogposts is failing occasionally (CASSANDRA-10938)
* Fix isJoined return true only after becoming cluster member (CASANDRA-11007)
* Fix bad gossip generation seen in long-running clusters (CASSANDRA-10969)
* Avoid NPE when incremental repair fails (CASSANDRA-10909)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/165f586e/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/copyutil.py b/pylib/cqlshlib/copyutil.py
index b015a77..f9e4a85 100644
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@ -1106,7 +1106,7 @@ class ExportSession(object):
session.default_timeout = export_process.options.copy['pagetimeout']
export_process.printdebugmsg("Created connection to %s with page size %d and timeout %d seconds per page"
- % (session.hosts, session.default_fetch_size, session.default_timeout))
+ % (cluster.contact_points, session.default_fetch_size, session.default_timeout))
self.cluster = cluster
self.session = session
@@ -1175,16 +1175,20 @@ class ExportProcess(ChildProcess):
token_range, info = self.inmsg.get()
self.start_request(token_range, info)
- def report_error(self, err, token_range=None):
+ @staticmethod
+ def get_error_message(err, print_traceback=False):
if isinstance(err, str):
msg = err
elif isinstance(err, BaseException):
msg = "%s - %s" % (err.__class__.__name__, err)
- if self.debug:
+ if print_traceback:
traceback.print_exc(err)
else:
msg = str(err)
+ return msg
+ def report_error(self, err, token_range=None):
+ msg = self.get_error_message(err, print_traceback=self.debug)
self.printdebugmsg(msg)
self.outmsg.put((token_range, Exception(msg)))
@@ -1193,48 +1197,67 @@ class ExportProcess(ChildProcess):
Begin querying a range by executing an async query that
will later on invoke the callbacks attached in attach_callbacks.
"""
- session = self.get_session(info['hosts'])
- metadata = session.cluster.metadata.keyspaces[self.ks].tables[self.table]
- query = self.prepare_query(metadata.partition_key, token_range, info['attempts'])
- future = session.execute_async(query)
- self.attach_callbacks(token_range, future, session)
+ session = self.get_session(info['hosts'], token_range)
+ if session:
+ metadata = session.cluster.metadata.keyspaces[self.ks].tables[self.table]
+ query = self.prepare_query(metadata.partition_key, token_range, info['attempts'])
+ future = session.execute_async(query)
+ self.attach_callbacks(token_range, future, session)
def num_requests(self):
return sum(session.num_requests() for session in self.hosts_to_sessions.values())
- def get_session(self, hosts):
+ def get_session(self, hosts, token_range):
"""
- We select a host to connect to. If we have no connections to one of the hosts
- yet then we select this host, else we pick the one with the smallest number
- of requests.
+ We return a session connected to one of the hosts passed in, which are valid replicas for
+ the token range. We sort replicas by favouring those without any active requests yet or with the
+ smallest number of requests. If we fail to connect we report an error so that the token will
+ be retried again later.
:return: An ExportSession connected to the chosen host.
"""
- new_hosts = [h for h in hosts if h not in self.hosts_to_sessions]
- if new_hosts:
- host = new_hosts[0]
- new_cluster = Cluster(
- contact_points=(host,),
- port=self.port,
- cql_version=self.cql_version,
- protocol_version=self.protocol_version,
- auth_provider=self.auth_provider,
- ssl_options=ssl_settings(host, self.config_file) if self.ssl else None,
- load_balancing_policy=TokenAwarePolicy(WhiteListRoundRobinPolicy(hosts)),
- default_retry_policy=ExpBackoffRetryPolicy(self),
- compression=None,
- control_connection_timeout=self.connect_timeout,
- connect_timeout=self.connect_timeout)
+ # sorted replicas favouring those with no connections yet
+ hosts = sorted(hosts,
+ key=lambda hh: 0 if hh not in self.hosts_to_sessions else self.hosts_to_sessions[hh].requests)
- session = ExportSession(new_cluster, self)
- self.hosts_to_sessions[host] = session
- return session
- else:
- host = min(hosts, key=lambda hh: self.hosts_to_sessions[hh].requests)
+ errors = []
+ ret = None
+ for host in hosts:
+ try:
+ ret = self.connect(host)
+ except Exception, e:
+ errors.append(self.get_error_message(e))
+
+ if ret:
+ if errors:
+ self.printdebugmsg("Warning: failed to connect to some replicas: %s" % (errors,))
+ return ret
+
+ self.report_error("Failed to connect to all replicas %s for %s, errors: %s" % (hosts, token_range, errors))
+ return None
+
+ def connect(self, host):
+ if host in self.hosts_to_sessions.keys():
session = self.hosts_to_sessions[host]
session.add_request()
return session
+ new_cluster = Cluster(
+ contact_points=(host,),
+ port=self.port,
+ cql_version=self.cql_version,
+ protocol_version=self.protocol_version,
+ auth_provider=self.auth_provider,
+ ssl_options=ssl_settings(host, self.config_file) if self.ssl else None,
+ load_balancing_policy=WhiteListRoundRobinPolicy([host]),
+ default_retry_policy=ExpBackoffRetryPolicy(self),
+ compression=None,
+ control_connection_timeout=self.connect_timeout,
+ connect_timeout=self.connect_timeout)
+ session = ExportSession(new_cluster, self)
+ self.hosts_to_sessions[host] = session
+ return session
+
def attach_callbacks(self, token_range, future, session):
def result_callback(rows):
if future.has_more_pages:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/165f586e/src/java/org/apache/cassandra/transport/ServerConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java b/src/java/org/apache/cassandra/transport/ServerConnection.java
index 5991b33..ce4d164 100644
--- a/src/java/org/apache/cassandra/transport/ServerConnection.java
+++ b/src/java/org/apache/cassandra/transport/ServerConnection.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.transport;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import io.netty.channel.Channel;
@@ -28,8 +29,6 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
public class ServerConnection extends Connection
{
private enum State { UNINITIALIZED, AUTHENTICATION, READY }
@@ -38,7 +37,7 @@ public class ServerConnection extends Connection
private final ClientState clientState;
private volatile State state;
- private final ConcurrentMap<Integer, QueryState> queryStates = new NonBlockingHashMap<>();
+ private final ConcurrentMap<Integer, QueryState> queryStates = new ConcurrentHashMap<>();
public ServerConnection(Channel channel, int version, Connection.Tracker tracker)
{