You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2015/12/17 12:56:37 UTC
[1/2] cassandra git commit: Close Clusters and Sessions in Hadoop
Input/Output classes
Repository: cassandra
Updated Branches:
refs/heads/trunk 37986e825 -> 50f7e0204
Close Clusters and Sessions in Hadoop Input/Output classes
patch by Alex Liu; reviewed by Benjamin Lerer for CASSANDRA-10837
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2da3c9db
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2da3c9db
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2da3c9db
Branch: refs/heads/trunk
Commit: 2da3c9db154449e15d5a2c2072db77b65c9e931a
Parents: ed96322
Author: Alex Liu <al...@yahoo.com>
Authored: Thu Dec 17 12:18:46 2015 +0100
Committer: blerer <be...@datastax.com>
Committed: Thu Dec 17 12:18:46 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/cql3/CqlInputFormat.java | 46 +++++++-------
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 63 +++++++-------------
3 files changed, 46 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2da3c9db/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7677e38..a2951a8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.3
+ * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-1837)
* Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806)
Merged from 2.2:
* Add new types to Stress (CASSANDRA-9556)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2da3c9db/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
index 534e66d..a426532 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
@@ -21,12 +21,14 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
+import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TokenRange;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -37,12 +39,12 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.hadoop.*;
+import static java.util.stream.Collectors.toMap;
/**
* Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
@@ -72,7 +74,6 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
private String keyspace;
private String cfName;
private IPartitioner partitioner;
- private Session session;
public RecordReader<Long, Row> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter)
throws IOException
@@ -123,14 +124,12 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
partitioner = ConfigHelper.getInputPartitioner(conf);
logger.trace("partitioner is {}", partitioner);
- // canonical ranges and nodes holding replicas
- Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(conf, keyspace);
-
// canonical ranges, split into pieces, fetching the splits in parallel
ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
List<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>();
- try
+ try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf);
+ Session session = cluster.connect())
{
List<Future<List<org.apache.hadoop.mapreduce.InputSplit>>> splitfutures = new ArrayList<>();
KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
@@ -159,15 +158,17 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
}
}
- session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect();
- Metadata metadata = session.getCluster().getMetadata();
+ Metadata metadata = cluster.getMetadata();
+
+ // canonical ranges and nodes holding replicas
+ Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(keyspace, metadata);
for (TokenRange range : masterRangeNodes.keySet())
{
if (jobRange == null)
{
// for each tokenRange, pick a live owner and ask it to compute bite-sized splits
- splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf)));
+ splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf, session)));
}
else
{
@@ -177,7 +178,7 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
for (TokenRange intersection: range.intersectWith(jobTokenRange))
{
// for each tokenRange, pick a live owner and ask it to compute bite-sized splits
- splitfutures.add(executor.submit(new SplitCallable(intersection, masterRangeNodes.get(range), conf)));
+ splitfutures.add(executor.submit(new SplitCallable(intersection, masterRangeNodes.get(range), conf, session)));
}
}
}
@@ -212,13 +213,13 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
metadata.newToken(partitioner.getTokenFactory().toString(range.right)));
}
- private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
+ private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf, Session session) throws IOException
{
int splitSize = ConfigHelper.getInputSplitSize(conf);
int splitSizeMb = ConfigHelper.getInputSplitSizeInMb(conf);
try
{
- return describeSplits(keyspace, cfName, range, splitSize, splitSizeMb);
+ return describeSplits(keyspace, cfName, range, splitSize, splitSizeMb, session);
}
catch (Exception e)
{
@@ -226,19 +227,14 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
}
}
- private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace)
+ private Map<TokenRange, Set<Host>> getRangeMap(String keyspace, Metadata metadata)
{
- try (Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect())
- {
- Map<TokenRange, Set<Host>> map = new HashMap<>();
- Metadata metadata = session.getCluster().getMetadata();
- for (TokenRange tokenRange : metadata.getTokenRanges())
- map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', tokenRange));
- return map;
- }
+ return metadata.getTokenRanges()
+ .stream()
+ .collect(toMap(p -> p, p -> metadata.getReplicas('"' + keyspace + '"', p)));
}
- private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, int splitSizeMb)
+ private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, int splitSizeMb, Session session)
{
String query = String.format("SELECT mean_partition_size, partitions_count " +
"FROM %s.%s " +
@@ -303,19 +299,21 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
private final TokenRange tokenRange;
private final Set<Host> hosts;
private final Configuration conf;
+ private final Session session;
- public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf)
+ public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf, Session session)
{
this.tokenRange = tr;
this.hosts = hosts;
this.conf = conf;
+ this.session = session;
}
public List<org.apache.hadoop.mapreduce.InputSplit> call() throws Exception
{
ArrayList<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>();
Map<TokenRange, Long> subSplits;
- subSplits = getSubSplits(keyspace, cfName, tokenRange, conf);
+ subSplits = getSubSplits(keyspace, cfName, tokenRange, conf, session);
// turn the sub-ranges into InputSplits
String[] endpoints = new String[hosts.size()];
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2da3c9db/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 96815ef..4c9b249 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import com.datastax.driver.core.*;
import com.datastax.driver.core.exceptions.*;
+
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
@@ -39,6 +40,8 @@ import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
+import static java.util.stream.Collectors.toMap;
+
/**
* The <code>CqlRecordWriter</code> maps the output <key, value>
* pairs to a Cassandra table. In particular, it applies the binded variables
@@ -113,25 +116,18 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
this.clients = new HashMap<>();
String keyspace = ConfigHelper.getOutputKeyspace(conf);
- try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf);
- Session client = cluster.connect(keyspace))
+ try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf))
{
- ringCache = new NativeRingCache(conf);
- if (client != null)
- {
- TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf));
- clusterColumns = tableMetadata.getClusteringColumns();
- partitionKeyColumns = tableMetadata.getPartitionKey();
-
- String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
- if (cqlQuery.toLowerCase().startsWith("insert"))
- throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
- cql = appendKeyWhereClauses(cqlQuery);
- }
- else
- {
- throw new IllegalArgumentException("Invalid configuration specified " + conf);
- }
+ Metadata metadata = cluster.getMetadata();
+ ringCache = new NativeRingCache(conf, metadata);
+ TableMetadata tableMetadata = metadata.getKeyspace(Metadata.quote(keyspace)).getTable(ConfigHelper.getOutputColumnFamily(conf));
+ clusterColumns = tableMetadata.getClusteringColumns();
+ partitionKeyColumns = tableMetadata.getPartitionKey();
+
+ String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
+ if (cqlQuery.toLowerCase().startsWith("insert"))
+ throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
+ cql = appendKeyWhereClauses(cqlQuery);
}
catch (Exception e)
{
@@ -383,9 +379,9 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
finally
{
closeSession(session);
+ // close all our connections once we are done.
+ closeInternal();
}
- // close all our connections once we are done.
- closeInternal();
}
/** get prepared statement id from cache, otherwise prepare it from Cassandra server*/
@@ -496,31 +492,18 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
static class NativeRingCache
{
- private Map<TokenRange, Set<Host>> rangeMap;
- private Metadata metadata;
+ private final Map<TokenRange, Set<Host>> rangeMap;
+ private final Metadata metadata;
private final IPartitioner partitioner;
- private final Configuration conf;
- public NativeRingCache(Configuration conf)
+ public NativeRingCache(Configuration conf, Metadata metadata)
{
- this.conf = conf;
this.partitioner = ConfigHelper.getOutputPartitioner(conf);
- refreshEndpointMap();
- }
-
-
- private void refreshEndpointMap()
- {
+ this.metadata = metadata;
String keyspace = ConfigHelper.getOutputKeyspace(conf);
- try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf);
- Session session = cluster.connect(keyspace))
- {
- rangeMap = new HashMap<>();
- metadata = session.getCluster().getMetadata();
- Set<TokenRange> ranges = metadata.getTokenRanges();
- for (TokenRange range : ranges)
- rangeMap.put(range, metadata.getReplicas(keyspace, range));
- }
+ this.rangeMap = metadata.getTokenRanges()
+ .stream()
+ .collect(toMap(p -> p, p -> metadata.getReplicas('"' + keyspace + '"', p)));
}
public TokenRange getRange(ByteBuffer key)
[2/2] cassandra git commit: Merge branch cassandra-3.0 into trunk
Posted by bl...@apache.org.
Merge branch cassandra-3.0 into trunk
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/50f7e020
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/50f7e020
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/50f7e020
Branch: refs/heads/trunk
Commit: 50f7e020490ec20608ce2b4321a7fdc3d8943a92
Parents: 37986e8 2da3c9d
Author: blerer <be...@datastax.com>
Authored: Thu Dec 17 12:55:55 2015 +0100
Committer: blerer <be...@datastax.com>
Committed: Thu Dec 17 12:55:55 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/cql3/CqlInputFormat.java | 46 +++++++-------
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 63 +++++++-------------
3 files changed, 46 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50f7e020/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 4b4a596,a2951a8..d49a75f
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,25 -1,5 +1,26 @@@
-3.0.3
+3.2
+ * Establish bootstrap stream sessions sequentially (CASSANDRA-6992)
+ * Sort compactionhistory output by timestamp (CASSANDRA-10464)
+ * More efficient BTree removal (CASSANDRA-9991)
+ * Make tablehistograms accept the same syntax as tablestats (CASSANDRA-10149)
+ * Group pending compactions based on table (CASSANDRA-10718)
+ * Add compressor name in sstablemetadata output (CASSANDRA-9879)
+ * Fix type casting for counter columns (CASSANDRA-10824)
+ * Prevent running Cassandra as root (CASSANDRA-8142)
+ * bound maximum in-flight commit log replay mutation bytes to 64 megabytes (CASSANDRA-8639)
+ * Normalize all scripts (CASSANDRA-10679)
+ * Make compression ratio much more accurate (CASSANDRA-10225)
+ * Optimize building of Clustering object when only one is created (CASSANDRA-10409)
+ * Make index building pluggable (CASSANDRA-10681)
+ * Add sstable flush observer (CASSANDRA-10678)
+ * Improve NTS endpoints calculation (CASSANDRA-10200)
+ * Improve performance of the folderSize function (CASSANDRA-10677)
+ * Add support for type casting in selection clause (CASSANDRA-10310)
+ * Added graphing option to cassandra-stress (CASSANDRA-7918)
+ * Abort in-progress queries that time out (CASSANDRA-7392)
+ * Add transparent data encryption core classes (CASSANDRA-9945)
+Merged from 3.0:
+ * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-1837)
* Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806)
Merged from 2.2:
* Add new types to Stress (CASSANDRA-9556)