You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2015/12/17 12:47:11 UTC

cassandra git commit: Close Clusters and Sessions in Hadoop Input/Output classes

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 ed96322a0 -> 2da3c9db1


Close Clusters and Sessions in Hadoop Input/Output classes

patch by Alex Liu; reviewed by Benjamin Lerer for CASSANDRA-10837


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2da3c9db
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2da3c9db
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2da3c9db

Branch: refs/heads/cassandra-3.0
Commit: 2da3c9db154449e15d5a2c2072db77b65c9e931a
Parents: ed96322
Author: Alex Liu <al...@yahoo.com>
Authored: Thu Dec 17 12:18:46 2015 +0100
Committer: blerer <be...@datastax.com>
Committed: Thu Dec 17 12:18:46 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/hadoop/cql3/CqlInputFormat.java   | 46 +++++++-------
 .../cassandra/hadoop/cql3/CqlRecordWriter.java  | 63 +++++++-------------
 3 files changed, 46 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2da3c9db/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7677e38..a2951a8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.3
+ * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-1837)
  * Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806)
 Merged from 2.2:
  * Add new types to Stress (CASSANDRA-9556)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2da3c9db/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
index 534e66d..a426532 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
@@ -21,12 +21,14 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.*;
 
+import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.Host;
 import com.datastax.driver.core.Metadata;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.TokenRange;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -37,12 +39,12 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.thrift.KeyRange;
 import org.apache.cassandra.hadoop.*;
 
+import static java.util.stream.Collectors.toMap;
 
 /**
  * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
@@ -72,7 +74,6 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
     private String keyspace;
     private String cfName;
     private IPartitioner partitioner;
-    private Session session;
 
     public RecordReader<Long, Row> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter)
             throws IOException
@@ -123,14 +124,12 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
         partitioner = ConfigHelper.getInputPartitioner(conf);
         logger.trace("partitioner is {}", partitioner);
 
-        // canonical ranges and nodes holding replicas
-        Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(conf, keyspace);
-
         // canonical ranges, split into pieces, fetching the splits in parallel
         ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
         List<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>();
 
-        try
+        try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf);
+             Session session = cluster.connect())
         {
             List<Future<List<org.apache.hadoop.mapreduce.InputSplit>>> splitfutures = new ArrayList<>();
             KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
@@ -159,15 +158,17 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
                 }
             }
 
-            session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect();
-            Metadata metadata = session.getCluster().getMetadata();
+            Metadata metadata = cluster.getMetadata();
+
+            // canonical ranges and nodes holding replicas
+            Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(keyspace, metadata);
 
             for (TokenRange range : masterRangeNodes.keySet())
             {
                 if (jobRange == null)
                 {
                     // for each tokenRange, pick a live owner and ask it to compute bite-sized splits
-                    splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf)));
+                    splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf, session)));
                 }
                 else
                 {
@@ -177,7 +178,7 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
                         for (TokenRange intersection: range.intersectWith(jobTokenRange))
                         {
                             // for each tokenRange, pick a live owner and ask it to compute bite-sized splits
-                            splitfutures.add(executor.submit(new SplitCallable(intersection,  masterRangeNodes.get(range), conf)));
+                            splitfutures.add(executor.submit(new SplitCallable(intersection,  masterRangeNodes.get(range), conf, session)));
                         }
                     }
                 }
@@ -212,13 +213,13 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
                 metadata.newToken(partitioner.getTokenFactory().toString(range.right)));
     }
 
-    private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
+    private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf, Session session) throws IOException
     {
         int splitSize = ConfigHelper.getInputSplitSize(conf);
         int splitSizeMb = ConfigHelper.getInputSplitSizeInMb(conf);
         try
         {
-            return describeSplits(keyspace, cfName, range, splitSize, splitSizeMb);
+            return describeSplits(keyspace, cfName, range, splitSize, splitSizeMb, session);
         }
         catch (Exception e)
         {
@@ -226,19 +227,14 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
         }
     }
 
-    private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace)
+    private Map<TokenRange, Set<Host>> getRangeMap(String keyspace, Metadata metadata)
     {
-        try (Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect())
-        {
-            Map<TokenRange, Set<Host>> map = new HashMap<>();
-            Metadata metadata = session.getCluster().getMetadata();
-            for (TokenRange tokenRange : metadata.getTokenRanges())
-                map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', tokenRange));
-            return map;
-        }
+        return metadata.getTokenRanges()
+                       .stream()
+                       .collect(toMap(p -> p, p -> metadata.getReplicas('"' + keyspace + '"', p)));
     }
 
-    private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, int splitSizeMb)
+    private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, int splitSizeMb, Session session)
     {
         String query = String.format("SELECT mean_partition_size, partitions_count " +
                                      "FROM %s.%s " +
@@ -303,19 +299,21 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
         private final TokenRange tokenRange;
         private final Set<Host> hosts;
         private final Configuration conf;
+        private final Session session;
 
-        public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf)
+        public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf, Session session)
         {
             this.tokenRange = tr;
             this.hosts = hosts;
             this.conf = conf;
+            this.session = session;
         }
 
         public List<org.apache.hadoop.mapreduce.InputSplit> call() throws Exception
         {
             ArrayList<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>();
             Map<TokenRange, Long> subSplits;
-            subSplits = getSubSplits(keyspace, cfName, tokenRange, conf);
+            subSplits = getSubSplits(keyspace, cfName, tokenRange, conf, session);
             // turn the sub-ranges into InputSplits
             String[] endpoints = new String[hosts.size()];
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2da3c9db/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 96815ef..4c9b249 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import com.datastax.driver.core.*;
 import com.datastax.driver.core.exceptions.*;
+
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
@@ -39,6 +40,8 @@ import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.Progressable;
 
+import static java.util.stream.Collectors.toMap;
+
 /**
  * The <code>CqlRecordWriter</code> maps the output &lt;key, value&gt;
  * pairs to a Cassandra table. In particular, it applies the binded variables
@@ -113,25 +116,18 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
         this.clients = new HashMap<>();
         String keyspace = ConfigHelper.getOutputKeyspace(conf);
 
-        try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf);
-             Session client = cluster.connect(keyspace))
+        try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf))
         {
-            ringCache = new NativeRingCache(conf);
-            if (client != null)
-            {
-                TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf));
-                clusterColumns = tableMetadata.getClusteringColumns();
-                partitionKeyColumns = tableMetadata.getPartitionKey();
-
-                String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
-                if (cqlQuery.toLowerCase().startsWith("insert"))
-                    throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
-                cql = appendKeyWhereClauses(cqlQuery);
-            }
-            else
-            {
-                throw new IllegalArgumentException("Invalid configuration specified " + conf);
-            }
+            Metadata metadata = cluster.getMetadata();
+            ringCache = new NativeRingCache(conf, metadata);
+            TableMetadata tableMetadata = metadata.getKeyspace(Metadata.quote(keyspace)).getTable(ConfigHelper.getOutputColumnFamily(conf));
+            clusterColumns = tableMetadata.getClusteringColumns();
+            partitionKeyColumns = tableMetadata.getPartitionKey();
+
+            String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
+            if (cqlQuery.toLowerCase().startsWith("insert"))
+                throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
+            cql = appendKeyWhereClauses(cqlQuery);
         }
         catch (Exception e)
         {
@@ -383,9 +379,9 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
             finally
             {
                 closeSession(session);
+                // close all our connections once we are done.
+                closeInternal();
             }
-            // close all our connections once we are done.
-            closeInternal();
         }
 
         /** get prepared statement id from cache, otherwise prepare it from Cassandra server*/
@@ -496,31 +492,18 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
 
     static class NativeRingCache
     {
-        private Map<TokenRange, Set<Host>> rangeMap;
-        private Metadata metadata;
+        private final Map<TokenRange, Set<Host>> rangeMap;
+        private final Metadata metadata;
         private final IPartitioner partitioner;
-        private final Configuration conf;
 
-        public NativeRingCache(Configuration conf)
+        public NativeRingCache(Configuration conf, Metadata metadata)
         {
-            this.conf = conf;
             this.partitioner = ConfigHelper.getOutputPartitioner(conf);
-            refreshEndpointMap();
-        }
-
-
-        private void refreshEndpointMap()
-        {
+            this.metadata = metadata;
             String keyspace = ConfigHelper.getOutputKeyspace(conf);
-            try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf);
-                 Session session = cluster.connect(keyspace))
-            {
-                rangeMap = new HashMap<>();
-                metadata = session.getCluster().getMetadata();
-                Set<TokenRange> ranges = metadata.getTokenRanges();
-                for (TokenRange range : ranges)
-                    rangeMap.put(range, metadata.getReplicas(keyspace, range));
-            }
+            this.rangeMap = metadata.getTokenRanges()
+                                    .stream()
+                                    .collect(toMap(p -> p, p -> metadata.getReplicas('"' + keyspace + '"', p)));
         }
 
         public TokenRange getRange(ByteBuffer key)