You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/11/10 15:34:22 UTC
cassandra git commit: (Hadoop) ensure that Cluster instances are
always closed
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 ef0e447ad -> 177f60705
(Hadoop) ensure that Cluster instances are always closed
patch by Alex Liu; reviewed by Aleksey Yeschenko for CASSANDRA-10058
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/177f6070
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/177f6070
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/177f6070
Branch: refs/heads/cassandra-2.2
Commit: 177f607057a9d4c4b3746cec51e8e283938a5363
Parents: ef0e447
Author: Alex Liu <al...@yahoo.com>
Authored: Tue Nov 10 14:32:55 2015 +0000
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Nov 10 14:34:00 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../hadoop/AbstractColumnFamilyInputFormat.java | 74 +++++++--------
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 96 ++++++++++----------
.../cassandra/hadoop/pig/CqlNativeStorage.java | 4 +-
4 files changed, 91 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/177f6070/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5edad20..81ceb25 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.4
+ * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
* (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
* Use most up-to-date version of schema for system tables (CASSANDRA-10652)
* Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/177f6070/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index e531ad1..d687183 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@ -24,6 +24,7 @@ import java.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ResultSet;
@@ -58,7 +59,6 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
private String keyspace;
private String cfName;
private IPartitioner partitioner;
- private Session session;
protected void validateConfiguration(Configuration conf)
{
@@ -90,36 +90,36 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
List<InputSplit> splits = new ArrayList<>();
- try
+ List<Future<List<InputSplit>>> splitfutures = new ArrayList<>();
+ KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
+ Range<Token> jobRange = null;
+ if (jobKeyRange != null)
{
- List<Future<List<InputSplit>>> splitfutures = new ArrayList<>();
- KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
- Range<Token> jobRange = null;
- if (jobKeyRange != null)
+ if (jobKeyRange.start_key != null)
{
- if (jobKeyRange.start_key != null)
- {
- if (!partitioner.preservesOrder())
- throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving partitioner");
- if (jobKeyRange.start_token != null)
- throw new IllegalArgumentException("only start_key supported");
- if (jobKeyRange.end_token != null)
- throw new IllegalArgumentException("only start_key supported");
- jobRange = new Range<>(partitioner.getToken(jobKeyRange.start_key),
- partitioner.getToken(jobKeyRange.end_key));
- }
- else if (jobKeyRange.start_token != null)
- {
- jobRange = new Range<>(partitioner.getTokenFactory().fromString(jobKeyRange.start_token),
- partitioner.getTokenFactory().fromString(jobKeyRange.end_token));
- }
- else
- {
- logger.warn("ignoring jobKeyRange specified without start_key or start_token");
- }
+ if (!partitioner.preservesOrder())
+ throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving partitioner");
+ if (jobKeyRange.start_token != null)
+ throw new IllegalArgumentException("only start_key supported");
+ if (jobKeyRange.end_token != null)
+ throw new IllegalArgumentException("only start_key supported");
+ jobRange = new Range<>(partitioner.getToken(jobKeyRange.start_key),
+ partitioner.getToken(jobKeyRange.end_key));
}
+ else if (jobKeyRange.start_token != null)
+ {
+ jobRange = new Range<>(partitioner.getTokenFactory().fromString(jobKeyRange.start_token),
+ partitioner.getTokenFactory().fromString(jobKeyRange.end_token));
+ }
+ else
+ {
+ logger.warn("ignoring jobKeyRange specified without start_key or start_token");
+ }
+ }
- session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect();
+ try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf))
+ {
+ Session session = cluster.connect();
Metadata metadata = session.getCluster().getMetadata();
for (TokenRange range : masterRangeNodes.keySet())
@@ -127,7 +127,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
if (jobRange == null)
{
// for each tokenRange, pick a live owner and ask it to compute bite-sized splits
- splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf)));
+ splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf, session)));
}
else
{
@@ -137,7 +137,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
for (TokenRange intersection: range.intersectWith(jobTokenRange))
{
// for each tokenRange, pick a live owner and ask it to compute bite-sized splits
- splitfutures.add(executor.submit(new SplitCallable(intersection, masterRangeNodes.get(range), conf)));
+ splitfutures.add(executor.submit(new SplitCallable(intersection, masterRangeNodes.get(range), conf, session)));
}
}
}
@@ -182,19 +182,21 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
private final TokenRange tokenRange;
private final Set<Host> hosts;
private final Configuration conf;
+ private final Session session;
- public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf)
+ public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf, Session session)
{
this.tokenRange = tr;
this.hosts = hosts;
this.conf = conf;
+ this.session = session;
}
public List<InputSplit> call() throws Exception
{
ArrayList<InputSplit> splits = new ArrayList<>();
Map<TokenRange, Long> subSplits;
- subSplits = getSubSplits(keyspace, cfName, tokenRange, conf);
+ subSplits = getSubSplits(keyspace, cfName, tokenRange, conf, session);
// turn the sub-ranges into InputSplits
String[] endpoints = new String[hosts.size()];
@@ -225,12 +227,12 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
}
}
- private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
+ private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf, Session session) throws IOException
{
int splitSize = ConfigHelper.getInputSplitSize(conf);
try
{
- return describeSplits(keyspace, cfName, range, splitSize);
+ return describeSplits(keyspace, cfName, range, splitSize, session);
}
catch (Exception e)
{
@@ -240,17 +242,17 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace)
{
- try (Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect())
+ try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf))
{
Map<TokenRange, Set<Host>> map = new HashMap<>();
- Metadata metadata = session.getCluster().getMetadata();
+ Metadata metadata = cluster.connect().getCluster().getMetadata();
for (TokenRange tokenRange : metadata.getTokenRanges())
map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', tokenRange));
return map;
}
}
- private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize)
+ private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, Session session)
{
String query = String.format("SELECT mean_partition_size, partitions_count " +
"FROM %s.%s " +
http://git-wip-us.apache.org/repos/asf/cassandra/blob/177f6070/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 6e8ffd9..14e24fb 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -113,27 +113,25 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
this.clients = new HashMap<>();
- try
+ try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf))
{
String keyspace = ConfigHelper.getOutputKeyspace(conf);
- try (Session client = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace))
+ Session client = cluster.connect(keyspace);
+ ringCache = new NativeRingCache(conf);
+ if (client != null)
{
- ringCache = new NativeRingCache(conf);
- if (client != null)
- {
- TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf));
- clusterColumns = tableMetadata.getClusteringColumns();
- partitionKeyColumns = tableMetadata.getPartitionKey();
-
- String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
- if (cqlQuery.toLowerCase().startsWith("insert"))
- throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
- cql = appendKeyWhereClauses(cqlQuery);
- }
- else
- {
- throw new IllegalArgumentException("Invalid configuration specified " + conf);
- }
+ TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf));
+ clusterColumns = tableMetadata.getClusteringColumns();
+ partitionKeyColumns = tableMetadata.getPartitionKey();
+
+ String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
+ if (cqlQuery.toLowerCase().startsWith("insert"))
+ throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
+ cql = appendKeyWhereClauses(cqlQuery);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Invalid configuration specified " + conf);
}
}
catch (Exception e)
@@ -235,7 +233,7 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
{
// The list of endpoints for this range
protected final List<InetAddress> endpoints;
- protected Session client;
+ protected Cluster cluster = null;
// A bounded queue of incoming mutations for this range
protected final BlockingQueue<List<ByteBuffer>> queue = new ArrayBlockingQueue<List<ByteBuffer>>(queueSize);
@@ -281,6 +279,7 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
*/
public void run()
{
+ Session session = null;
outer:
while (run || !queue.isEmpty())
{
@@ -299,34 +298,37 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
while (true)
{
// send the mutation to the last-used endpoint. first time through, this will NPE harmlessly.
- try
+ if (session != null)
{
- int i = 0;
- PreparedStatement statement = preparedStatement(client);
- while (bindVariables != null)
+ try
{
- BoundStatement boundStatement = new BoundStatement(statement);
- for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++)
+ int i = 0;
+ PreparedStatement statement = preparedStatement(session);
+ while (bindVariables != null)
{
- boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+ BoundStatement boundStatement = new BoundStatement(statement);
+ for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++)
+ {
+ boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+ }
+ session.execute(boundStatement);
+ i++;
+
+ if (i >= batchThreshold)
+ break;
+ bindVariables = queue.poll();
}
- client.execute(boundStatement);
- i++;
-
- if (i >= batchThreshold)
- break;
- bindVariables = queue.poll();
+ break;
}
- break;
- }
- catch (Exception e)
- {
- closeInternal();
- if (!iter.hasNext())
+ catch (Exception e)
{
- lastException = new IOException(e);
- break outer;
- }
+ closeInternal();
+ if (!iter.hasNext())
+ {
+ lastException = new IOException(e);
+ break outer;
+ }
+ }
}
// attempt to connect to a different endpoint
@@ -334,7 +336,8 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
{
InetAddress address = iter.next();
String host = address.getHostName();
- client = CqlConfigHelper.getOutputCluster(host, conf).connect();
+ cluster = CqlConfigHelper.getOutputCluster(host, conf);
+ session = cluster.connect();
}
catch (Exception e)
{
@@ -404,9 +407,9 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
protected void closeInternal()
{
- if (client != null)
+ if (cluster != null)
{
- client.close();
+ cluster.close();
}
}
@@ -486,15 +489,14 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
private void refreshEndpointMap()
{
String keyspace = ConfigHelper.getOutputKeyspace(conf);
- try (Session session = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace))
+ try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf))
{
+ Session session = cluster.connect(keyspace);
rangeMap = new HashMap<>();
metadata = session.getCluster().getMetadata();
Set<TokenRange> ranges = metadata.getTokenRanges();
for (TokenRange range : ranges)
- {
rangeMap.put(range, metadata.getReplicas(keyspace, range));
- }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/177f6070/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index ba0a37d..74058b1 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -28,6 +28,7 @@ import java.net.URLDecoder;
import java.nio.ByteBuffer;
import java.util.*;
+import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.Row;
@@ -723,8 +724,9 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo
// Only get the schema if we haven't already gotten it
if (!properties.containsKey(signature))
{
- try (Session client = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf).connect())
+ try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf))
{
+ Session client = cluster.connect();
client.execute("USE " + keyspace);
// compose the CfDef for the columfamily