You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/11/10 15:34:22 UTC

cassandra git commit: (Hadoop) ensure that Cluster instances are always closed

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 ef0e447ad -> 177f60705


(Hadoop) ensure that Cluster instances are always closed

patch by Alex Liu; reviewed by Aleksey Yeschenko for CASSANDRA-10058


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/177f6070
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/177f6070
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/177f6070

Branch: refs/heads/cassandra-2.2
Commit: 177f607057a9d4c4b3746cec51e8e283938a5363
Parents: ef0e447
Author: Alex Liu <al...@yahoo.com>
Authored: Tue Nov 10 14:32:55 2015 +0000
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Nov 10 14:34:00 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../hadoop/AbstractColumnFamilyInputFormat.java | 74 +++++++--------
 .../cassandra/hadoop/cql3/CqlRecordWriter.java  | 96 ++++++++++----------
 .../cassandra/hadoop/pig/CqlNativeStorage.java  |  4 +-
 4 files changed, 91 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/177f6070/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5edad20..81ceb25 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.4
+ * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
  * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
  * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
  * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/177f6070/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index e531ad1..d687183 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@ -24,6 +24,7 @@ import java.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.Host;
 import com.datastax.driver.core.Metadata;
 import com.datastax.driver.core.ResultSet;
@@ -58,7 +59,6 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
     private String keyspace;
     private String cfName;
     private IPartitioner partitioner;
-    private Session session;
 
     protected void validateConfiguration(Configuration conf)
     {
@@ -90,36 +90,36 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
         ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
         List<InputSplit> splits = new ArrayList<>();
 
-        try
+        List<Future<List<InputSplit>>> splitfutures = new ArrayList<>();
+        KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
+        Range<Token> jobRange = null;
+        if (jobKeyRange != null)
         {
-            List<Future<List<InputSplit>>> splitfutures = new ArrayList<>();
-            KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
-            Range<Token> jobRange = null;
-            if (jobKeyRange != null)
+            if (jobKeyRange.start_key != null)
             {
-                if (jobKeyRange.start_key != null)
-                {
-                    if (!partitioner.preservesOrder())
-                        throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving partitioner");
-                    if (jobKeyRange.start_token != null)
-                        throw new IllegalArgumentException("only start_key supported");
-                    if (jobKeyRange.end_token != null)
-                        throw new IllegalArgumentException("only start_key supported");
-                    jobRange = new Range<>(partitioner.getToken(jobKeyRange.start_key),
-                                           partitioner.getToken(jobKeyRange.end_key));
-                }
-                else if (jobKeyRange.start_token != null)
-                {
-                    jobRange = new Range<>(partitioner.getTokenFactory().fromString(jobKeyRange.start_token),
-                                           partitioner.getTokenFactory().fromString(jobKeyRange.end_token));
-                }
-                else
-                {
-                    logger.warn("ignoring jobKeyRange specified without start_key or start_token");
-                }
+                if (!partitioner.preservesOrder())
+                    throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving partitioner");
+                if (jobKeyRange.start_token != null)
+                    throw new IllegalArgumentException("only start_key supported");
+                if (jobKeyRange.end_token != null)
+                    throw new IllegalArgumentException("only start_key supported");
+                jobRange = new Range<>(partitioner.getToken(jobKeyRange.start_key),
+                                       partitioner.getToken(jobKeyRange.end_key));
             }
+            else if (jobKeyRange.start_token != null)
+            {
+                jobRange = new Range<>(partitioner.getTokenFactory().fromString(jobKeyRange.start_token),
+                                       partitioner.getTokenFactory().fromString(jobKeyRange.end_token));
+            }
+            else
+            {
+                logger.warn("ignoring jobKeyRange specified without start_key or start_token");
+            }
+        }
 
-            session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect();
+        try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf))
+        {
+            Session session = cluster.connect();
             Metadata metadata = session.getCluster().getMetadata();
 
             for (TokenRange range : masterRangeNodes.keySet())
@@ -127,7 +127,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
                 if (jobRange == null)
                 {
                     // for each tokenRange, pick a live owner and ask it to compute bite-sized splits
-                    splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf)));
+                    splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf, session)));
                 }
                 else
                 {
@@ -137,7 +137,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
                         for (TokenRange intersection: range.intersectWith(jobTokenRange))
                         {
                             // for each tokenRange, pick a live owner and ask it to compute bite-sized splits
-                            splitfutures.add(executor.submit(new SplitCallable(intersection,  masterRangeNodes.get(range), conf)));
+                            splitfutures.add(executor.submit(new SplitCallable(intersection,  masterRangeNodes.get(range), conf, session)));
                         }
                     }
                 }
@@ -182,19 +182,21 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
         private final TokenRange tokenRange;
         private final Set<Host> hosts;
         private final Configuration conf;
+        private final Session session;
 
-        public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf)
+        public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf, Session session)
         {
             this.tokenRange = tr;
             this.hosts = hosts;
             this.conf = conf;
+            this.session = session;
         }
 
         public List<InputSplit> call() throws Exception
         {
             ArrayList<InputSplit> splits = new ArrayList<>();
             Map<TokenRange, Long> subSplits;
-            subSplits = getSubSplits(keyspace, cfName, tokenRange, conf);
+            subSplits = getSubSplits(keyspace, cfName, tokenRange, conf, session);
             // turn the sub-ranges into InputSplits
             String[] endpoints = new String[hosts.size()];
 
@@ -225,12 +227,12 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
         }
     }
 
-    private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
+    private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf, Session session) throws IOException
     {
         int splitSize = ConfigHelper.getInputSplitSize(conf);
         try
         {
-            return describeSplits(keyspace, cfName, range, splitSize);
+            return describeSplits(keyspace, cfName, range, splitSize, session);
         }
         catch (Exception e)
         {
@@ -240,17 +242,17 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
 
     private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace)
     {
-        try (Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect())
+        try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf))
         {
             Map<TokenRange, Set<Host>> map = new HashMap<>();
-            Metadata metadata = session.getCluster().getMetadata();
+            Metadata metadata = cluster.connect().getCluster().getMetadata();
             for (TokenRange tokenRange : metadata.getTokenRanges())
                 map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', tokenRange));
             return map;
         }
     }
 
-    private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize)
+    private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, Session session)
     {
         String query = String.format("SELECT mean_partition_size, partitions_count " +
                                      "FROM %s.%s " +

http://git-wip-us.apache.org/repos/asf/cassandra/blob/177f6070/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 6e8ffd9..14e24fb 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -113,27 +113,25 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
         batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
         this.clients = new HashMap<>();
 
-        try
+        try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf))
         {
             String keyspace = ConfigHelper.getOutputKeyspace(conf);
-            try (Session client = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace))
+            Session client = cluster.connect(keyspace);
+            ringCache = new NativeRingCache(conf);
+            if (client != null)
             {
-                ringCache = new NativeRingCache(conf);
-                if (client != null)
-                {
-                    TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf));
-                    clusterColumns = tableMetadata.getClusteringColumns();
-                    partitionKeyColumns = tableMetadata.getPartitionKey();
-
-                    String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
-                    if (cqlQuery.toLowerCase().startsWith("insert"))
-                        throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
-                    cql = appendKeyWhereClauses(cqlQuery);
-                }
-                else
-                {
-                    throw new IllegalArgumentException("Invalid configuration specified " + conf);
-                }
+                TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf));
+                clusterColumns = tableMetadata.getClusteringColumns();
+                partitionKeyColumns = tableMetadata.getPartitionKey();
+
+                String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
+                if (cqlQuery.toLowerCase().startsWith("insert"))
+                    throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
+                cql = appendKeyWhereClauses(cqlQuery);
+            }
+            else
+            {
+                throw new IllegalArgumentException("Invalid configuration specified " + conf);
             }
         }
         catch (Exception e)
@@ -235,7 +233,7 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
     {
         // The list of endpoints for this range
         protected final List<InetAddress> endpoints;
-        protected Session client;
+        protected Cluster cluster = null;
         // A bounded queue of incoming mutations for this range
         protected final BlockingQueue<List<ByteBuffer>> queue = new ArrayBlockingQueue<List<ByteBuffer>>(queueSize);
 
@@ -281,6 +279,7 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
          */
         public void run()
         {
+            Session session = null;
             outer:
             while (run || !queue.isEmpty())
             {
@@ -299,34 +298,37 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
                 while (true)
                 {
                     // send the mutation to the last-used endpoint.  first time through, this will NPE harmlessly.
-                    try
+                    if (session != null)
                     {
-                        int i = 0;
-                        PreparedStatement statement = preparedStatement(client);
-                        while (bindVariables != null)
+                        try
                         {
-                            BoundStatement boundStatement = new BoundStatement(statement);
-                            for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++)
+                            int i = 0;
+                            PreparedStatement statement = preparedStatement(session);
+                            while (bindVariables != null)
                             {
-                                boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+                                BoundStatement boundStatement = new BoundStatement(statement);
+                                for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++)
+                                {
+                                    boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+                                }
+                                session.execute(boundStatement);
+                                i++;
+
+                                if (i >= batchThreshold)
+                                    break;
+                                bindVariables = queue.poll();
                             }
-                            client.execute(boundStatement);
-                            i++;
-
-                            if (i >= batchThreshold)
-                                break;
-                            bindVariables = queue.poll();
+                            break;
                         }
-                        break;
-                    }
-                    catch (Exception e)
-                    {
-                        closeInternal();
-                        if (!iter.hasNext())
+                        catch (Exception e)
                         {
-                            lastException = new IOException(e);
-                            break outer;
-                        }
+                            closeInternal();
+                            if (!iter.hasNext())
+                            {
+                                lastException = new IOException(e);
+                                break outer;
+                            }
+                        }                        
                     }
 
                     // attempt to connect to a different endpoint
@@ -334,7 +336,8 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
                     {
                         InetAddress address = iter.next();
                         String host = address.getHostName();
-                        client = CqlConfigHelper.getOutputCluster(host, conf).connect();
+                        cluster = CqlConfigHelper.getOutputCluster(host, conf);
+                        session = cluster.connect();
                     }
                     catch (Exception e)
                     {
@@ -404,9 +407,9 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
 
         protected void closeInternal()
         {
-            if (client != null)
+            if (cluster != null)
             {
-                client.close();
+                cluster.close();
             }
         }
 
@@ -486,15 +489,14 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
         private void refreshEndpointMap()
         {
             String keyspace = ConfigHelper.getOutputKeyspace(conf);
-            try (Session session = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace))
+            try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf))
             {
+                Session session = cluster.connect(keyspace);
                 rangeMap = new HashMap<>();
                 metadata = session.getCluster().getMetadata();
                 Set<TokenRange> ranges = metadata.getTokenRanges();
                 for (TokenRange range : ranges)
-                {
                     rangeMap.put(range, metadata.getReplicas(keyspace, range));
-                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/177f6070/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index ba0a37d..74058b1 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -28,6 +28,7 @@ import java.net.URLDecoder;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ColumnMetadata;
 import com.datastax.driver.core.Metadata;
 import com.datastax.driver.core.Row;
@@ -723,8 +724,9 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo
         // Only get the schema if we haven't already gotten it
         if (!properties.containsKey(signature))
         {
-            try (Session client = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf).connect())
+            try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf))
             {
+                Session client = cluster.connect();
                 client.execute("USE " + keyspace);
 
                 // compose the CfDef for the columfamily