You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/03/25 16:52:31 UTC
[1/2] git commit: Revert "Add CqlRecordReader to take advantage of
native CQL pagination"
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 43092aac9 -> 6df316375
Revert "Add CqlRecordReader to take advantage of native CQL pagination"
This reverts commit 3b708f9989274cbe9e0e2a5fda6f1d0a3d96ebee.
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b0841d8f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b0841d8f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b0841d8f
Branch: refs/heads/cassandra-2.1
Commit: b0841d8fee4c89647894f056b1106b9ac67be8b9
Parents: e6c8034
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Mar 25 16:49:10 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Mar 25 16:49:10 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 -
build.xml | 4 +-
examples/hadoop_cql3_word_count/bin/word_count | 3 +-
.../bin/word_count_counters | 4 +-
.../hadoop_cql3_word_count/src/WordCount.java | 77 +--
.../src/WordCountCounters.java | 54 +-
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 541 +------------------
.../cassandra/hadoop/cql3/CqlInputFormat.java | 80 ---
.../hadoop/cql3/CqlPagingRecordReader.java | 2 +-
.../cassandra/hadoop/cql3/CqlRecordReader.java | 260 ---------
10 files changed, 30 insertions(+), 996 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ed202cc..226855c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,7 +27,6 @@
* Improve PerRowSecondaryIndex performance (CASSANDRA-6876)
* Extend triggers to support CAS updates (CASSANDRA-6882)
* Static columns with IF NOT EXISTS don't always work as expected (CASSANDRA-6873)
- * Add CqlRecordReader to take advantage of native CQL pagination (CASSANDRA-6311)
* Fix paging with SELECT DISTINCT (CASSANDRA-6857)
Merged from 1.2:
* Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index a15415b..464dece 100644
--- a/build.xml
+++ b/build.xml
@@ -380,7 +380,6 @@
<dependency groupId="edu.stanford.ppl" artifactId="snaptree" version="0.1" />
<dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
<dependency groupId="io.netty" artifactId="netty" version="3.6.6.Final" />
- <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.0.1" />
<dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
</dependencyManagement>
<developer id="alakshman" name="Avinash Lakshman"/>
@@ -411,7 +410,7 @@
<dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/>
<dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/>
<dependency groupId="org.apache.pig" artifactId="pig"/>
- <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core"/>
+
<dependency groupId="net.java.dev.jna" artifactId="jna"/>
</artifact:pom>
@@ -474,7 +473,6 @@
<dependency groupId="org.apache.hadoop" artifactId="hadoop-core" optional="true"/>
<dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" optional="true"/>
<dependency groupId="org.apache.pig" artifactId="pig" optional="true"/>
- <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" optional="true"/>
<!-- don't need jna to run, but nice to have -->
<dependency groupId="net.java.dev.jna" artifactId="jna" optional="true"/>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/examples/hadoop_cql3_word_count/bin/word_count
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/bin/word_count b/examples/hadoop_cql3_word_count/bin/word_count
index 974a39a..a0c5aa0 100644
--- a/examples/hadoop_cql3_word_count/bin/word_count
+++ b/examples/hadoop_cql3_word_count/bin/word_count
@@ -56,7 +56,6 @@ if [ "x$JAVA" = "x" ]; then
fi
OUTPUT_REDUCER=cassandra
-INPUT_MAPPER=native
#echo $CLASSPATH
-$JAVA -Xmx1G -ea -cp $CLASSPATH WordCount output_reducer=$OUTPUT_REDUCER input_mapper=$INPUT_MAPPER
+$JAVA -Xmx1G -ea -cp $CLASSPATH WordCount output_reducer=$OUTPUT_REDUCER
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/examples/hadoop_cql3_word_count/bin/word_count_counters
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/bin/word_count_counters b/examples/hadoop_cql3_word_count/bin/word_count_counters
index 0b69b40..7793477 100644
--- a/examples/hadoop_cql3_word_count/bin/word_count_counters
+++ b/examples/hadoop_cql3_word_count/bin/word_count_counters
@@ -54,7 +54,5 @@ if [ "x$JAVA" = "x" ]; then
exit 1
fi
-INPUT_MAPPER=native
-
#echo $CLASSPATH
-$JAVA -Xmx1G -ea -cp $CLASSPATH WordCountCounters input_mapper=$INPUT_MAPPER
+$JAVA -Xmx1G -ea -cp $CLASSPATH WordCountCounters
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/examples/hadoop_cql3_word_count/src/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCount.java b/examples/hadoop_cql3_word_count/src/WordCount.java
index 519a98f..bc81a53 100644
--- a/examples/hadoop_cql3_word_count/src/WordCount.java
+++ b/examples/hadoop_cql3_word_count/src/WordCount.java
@@ -27,7 +27,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat;
-import org.apache.cassandra.hadoop.cql3.CqlInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
@@ -38,11 +37,10 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import com.datastax.driver.core.Row;
+
import java.nio.charset.CharacterCodingException;
/**
@@ -62,7 +60,7 @@ import java.nio.charset.CharacterCodingException;
public class WordCount extends Configured implements Tool
{
private static final Logger logger = LoggerFactory.getLogger(WordCount.class);
- static final String INPUT_MAPPER_VAR = "input_mapper";
+
static final String KEYSPACE = "cql3_worldcount";
static final String COLUMN_FAMILY = "inputs";
@@ -70,6 +68,7 @@ public class WordCount extends Configured implements Tool
static final String OUTPUT_COLUMN_FAMILY = "output_words";
private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count";
+
private static final String PRIMARY_KEY = "row_key";
public static void main(String[] args) throws Exception
@@ -109,30 +108,6 @@ public class WordCount extends Configured implements Tool
}
}
- public static class NativeTokenizerMapper extends Mapper<Long, Row, Text, IntWritable>
- {
- private final static IntWritable one = new IntWritable(1);
- private Text word = new Text();
- private ByteBuffer sourceColumn;
-
- protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
- throws IOException, InterruptedException
- {
- }
-
- public void map(Long key, Row row, Context context) throws IOException, InterruptedException
- {
- String value = row.getString("line");
- logger.debug("read {}:{}={} from {}", new Object[] {key, "line", value, context.getInputSplit()});
- StringTokenizer itr = new StringTokenizer(value);
- while (itr.hasMoreTokens())
- {
- word.set(itr.nextToken());
- context.write(word, one);
- }
- }
- }
-
public static class ReducerToFilesystem extends Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
@@ -174,41 +149,17 @@ public class WordCount extends Configured implements Tool
public int run(String[] args) throws Exception
{
String outputReducerType = "filesystem";
- String inputMapperType = "native";
- String outputReducer = null;
- String inputMapper = null;
-
- if (args != null)
+ if (args != null && args[0].startsWith(OUTPUT_REDUCER_VAR))
{
- if(args[0].startsWith(OUTPUT_REDUCER_VAR))
- outputReducer = args[0];
- if(args[0].startsWith(INPUT_MAPPER_VAR))
- inputMapper = args[0];
-
- if (args.length == 2)
- {
- if(args[1].startsWith(OUTPUT_REDUCER_VAR))
- outputReducer = args[1];
- if(args[1].startsWith(INPUT_MAPPER_VAR))
- inputMapper = args[1];
- }
- }
-
- if (outputReducer != null)
- {
- String[] s = outputReducer.split("=");
+ String[] s = args[0].split("=");
if (s != null && s.length == 2)
outputReducerType = s[1];
}
logger.info("output reducer type: " + outputReducerType);
- if (inputMapper != null)
- {
- String[] s = inputMapper.split("=");
- if (s != null && s.length == 2)
- inputMapperType = s[1];
- }
+
Job job = new Job(getConf(), "wordcount");
job.setJarByClass(WordCount.class);
+ job.setMapperClass(TokenizerMapper.class);
if (outputReducerType.equalsIgnoreCase("filesystem"))
{
@@ -238,19 +189,9 @@ public class WordCount extends Configured implements Tool
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
}
- if (inputMapperType.equalsIgnoreCase("native"))
- {
- job.setMapperClass(NativeTokenizerMapper.class);
- job.setInputFormatClass(CqlInputFormat.class);
- CqlConfigHelper.setInputCql(job.getConfiguration(), "select * from " + COLUMN_FAMILY + " where token(id) > ? and token(id) <= ? allow filtering");
- }
- else
- {
- job.setMapperClass(TokenizerMapper.class);
- job.setInputFormatClass(CqlPagingInputFormat.class);
- ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
- }
+ job.setInputFormatClass(CqlPagingInputFormat.class);
+ ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/examples/hadoop_cql3_word_count/src/WordCountCounters.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCountCounters.java b/examples/hadoop_cql3_word_count/src/WordCountCounters.java
index 74de9ab..542a473 100644
--- a/examples/hadoop_cql3_word_count/src/WordCountCounters.java
+++ b/examples/hadoop_cql3_word_count/src/WordCountCounters.java
@@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat;
-import org.apache.cassandra.hadoop.cql3.CqlInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
@@ -38,7 +37,7 @@ import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import com.datastax.driver.core.Row;
+
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -52,7 +51,6 @@ public class WordCountCounters extends Configured implements Tool
{
private static final Logger logger = LoggerFactory.getLogger(WordCountCounters.class);
- static final String INPUT_MAPPER_VAR = "input_mapper";
static final String COUNTER_COLUMN_FAMILY = "input_words_count";
private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count_counters";
@@ -63,24 +61,6 @@ public class WordCountCounters extends Configured implements Tool
System.exit(0);
}
- public static class SumNativeMapper extends Mapper<Long, Row, Text, LongWritable>
- {
- long sum = -1;
- public void map(Long key, Row row, Context context) throws IOException, InterruptedException
- {
- if (sum < 0)
- sum = 0;
-
- logger.debug("read " + key + ":count_num from " + context.getInputSplit());
- sum += Long.valueOf(row.getString("count_num"));
- }
-
- protected void cleanup(Context context) throws IOException, InterruptedException {
- if (sum > 0)
- context.write(new Text("total_count"), new LongWritable(sum));
- }
- }
-
public static class SumMapper extends Mapper<Map<String, ByteBuffer>, Map<String, ByteBuffer>, Text, LongWritable>
{
long sum = -1;
@@ -115,6 +95,7 @@ public class WordCountCounters extends Configured implements Tool
}
}
+
public static class ReducerToFilesystem extends Reducer<Text, LongWritable, Text, LongWritable>
{
long sum = 0;
@@ -129,40 +110,25 @@ public class WordCountCounters extends Configured implements Tool
public int run(String[] args) throws Exception
{
- String inputMapperType = "native";
- if (args != null && args[0].startsWith(INPUT_MAPPER_VAR))
- {
- String[] arg0 = args[0].split("=");
- if (arg0 != null && arg0.length == 2)
- inputMapperType = arg0[1];
- }
Job job = new Job(getConf(), "wordcountcounters");
+ job.setJarByClass(WordCountCounters.class);
+ job.setMapperClass(SumMapper.class);
job.setCombinerClass(ReducerToFilesystem.class);
job.setReducerClass(ReducerToFilesystem.class);
- job.setJarByClass(WordCountCounters.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(LongWritable.class);
+ FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX));
+
+ job.setInputFormatClass(CqlPagingInputFormat.class);
+ ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
ConfigHelper.setInputColumnFamily(job.getConfiguration(), WordCount.KEYSPACE, WordCount.OUTPUT_COLUMN_FAMILY);
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
- if ("native".equals(inputMapperType))
- {
- job.setMapperClass(SumNativeMapper.class);
- job.setInputFormatClass(CqlInputFormat.class);
- CqlConfigHelper.setInputCql(job.getConfiguration(), "select * from " + WordCount.OUTPUT_COLUMN_FAMILY + " where token(word) > ? and token(word) <= ? allow filtering");
- }
- else
- {
- job.setMapperClass(SumMapper.class);
- job.setInputFormatClass(CqlPagingInputFormat.class);
- ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
- }
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX));
job.waitForCompletion(true);
return 0;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 3672c84..cb61d05 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -19,69 +19,13 @@ package org.apache.cassandra.hadoop.cql3;
* under the License.
*
*/
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.security.KeyManagementException;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Set;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManagerFactory;
-
-import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.hadoop.conf.Configuration;
-import com.datastax.driver.core.AuthProvider;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Host;
-import com.datastax.driver.core.HostDistance;
-import com.datastax.driver.core.PoolingOptions;
-import com.datastax.driver.core.ProtocolOptions;
-import com.datastax.driver.core.QueryOptions;
-import com.datastax.driver.core.SSLOptions;
-import com.datastax.driver.core.SocketOptions;
-import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.policies.LoadBalancingPolicy;
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Sets;
-
public class CqlConfigHelper
{
private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon ,
private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size";
private static final String INPUT_CQL_WHERE_CLAUSE_CONFIG = "cassandra.input.where.clause";
- private static final String INPUT_CQL = "cassandra.input.cql";
-
- private static final String INPUT_NATIVE_PORT = "cassandra.input.native.port";
- private static final String INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST = "cassandra.input.native.core.connections.per.host";
- private static final String INPUT_NATIVE_MAX_CONNECTIONS_PER_HOST = "cassandra.input.native.max.connections.per.host";
- private static final String INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION = "cassandra.input.native.min.simult.reqs.per.connection";
- private static final String INPUT_NATIVE_MAX_SIMULT_REQ_PER_CONNECTION = "cassandra.input.native.max.simult.reqs.per.connection";
- private static final String INPUT_NATIVE_CONNECTION_TIMEOUT = "cassandra.input.native.connection.timeout";
- private static final String INPUT_NATIVE_READ_CONNECTION_TIMEOUT = "cassandra.input.native.read.connection.timeout";
- private static final String INPUT_NATIVE_RECEIVE_BUFFER_SIZE = "cassandra.input.native.receive.buffer.size";
- private static final String INPUT_NATIVE_SEND_BUFFER_SIZE = "cassandra.input.native.send.buffer.size";
- private static final String INPUT_NATIVE_SOLINGER = "cassandra.input.native.solinger";
- private static final String INPUT_NATIVE_TCP_NODELAY = "cassandra.input.native.tcp.nodelay";
- private static final String INPUT_NATIVE_REUSE_ADDRESS = "cassandra.input.native.reuse.address";
- private static final String INPUT_NATIVE_KEEP_ALIVE = "cassandra.input.native.keep.alive";
- private static final String INPUT_NATIVE_AUTH_PROVIDER = "cassandra.input.native.auth.provider";
- private static final String INPUT_NATIVE_SSL_TRUST_STORE_PATH = "cassandra.input.native.ssl.trust.store.path";
- private static final String INPUT_NATIVE_SSL_KEY_STORE_PATH = "cassandra.input.native.ssl.key.store.path";
- private static final String INPUT_NATIVE_SSL_TRUST_STORE_PASSWARD = "cassandra.input.native.ssl.trust.store.password";
- private static final String INPUT_NATIVE_SSL_KEY_STORE_PASSWARD = "cassandra.input.native.ssl.key.store.password";
- private static final String INPUT_NATIVE_SSL_CIPHER_SUITES = "cassandra.input.native.ssl.cipher.suites";
-
private static final String OUTPUT_CQL = "cassandra.output.cql";
/**
@@ -141,496 +85,25 @@ public class CqlConfigHelper
conf.set(OUTPUT_CQL, cql);
}
-
- public static void setInputCql(Configuration conf, String cql)
- {
- if (cql == null || cql.isEmpty())
- return;
-
- conf.set(INPUT_CQL, cql);
- }
-
- public static Optional<Integer> getInputCoreConnections(Configuration conf)
- {
- return getIntSetting(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, conf);
- }
-
- public static Optional<Integer> getInputMaxConnections(Configuration conf)
- {
- return getIntSetting(INPUT_NATIVE_MAX_CONNECTIONS_PER_HOST, conf);
- }
-
- public static int getInputNativePort(Configuration conf)
- {
- return Integer.parseInt(conf.get(INPUT_NATIVE_PORT, "9042"));
- }
-
- public static Optional<Integer> getInputMinSimultReqPerConnections(Configuration conf)
- {
- return getIntSetting(INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION, conf);
- }
-
- public static Optional<Integer> getInputMaxSimultReqPerConnections(Configuration conf)
- {
- return getIntSetting(INPUT_NATIVE_MAX_SIMULT_REQ_PER_CONNECTION, conf);
- }
-
- public static Optional<Integer> getInputNativeConnectionTimeout(Configuration conf)
- {
- return getIntSetting(INPUT_NATIVE_CONNECTION_TIMEOUT, conf);
- }
-
- public static Optional<Integer> getInputNativeReadConnectionTimeout(Configuration conf)
- {
- return getIntSetting(INPUT_NATIVE_READ_CONNECTION_TIMEOUT, conf);
- }
-
- public static Optional<Integer> getInputNativeReceiveBufferSize(Configuration conf)
- {
- return getIntSetting(INPUT_NATIVE_RECEIVE_BUFFER_SIZE, conf);
- }
-
- public static Optional<Integer> getInputNativeSendBufferSize(Configuration conf)
- {
- return getIntSetting(INPUT_NATIVE_SEND_BUFFER_SIZE, conf);
- }
-
- public static Optional<Integer> getInputNativeSolinger(Configuration conf)
- {
- return getIntSetting(INPUT_NATIVE_SOLINGER, conf);
- }
-
- public static Optional<Boolean> getInputNativeTcpNodelay(Configuration conf)
- {
- return getBooleanSetting(INPUT_NATIVE_TCP_NODELAY, conf);
- }
-
- public static Optional<Boolean> getInputNativeReuseAddress(Configuration conf)
- {
- return getBooleanSetting(INPUT_NATIVE_REUSE_ADDRESS, conf);
- }
-
- public static Optional<String> getInputNativeAuthProvider(Configuration conf)
- {
- return getStringSetting(INPUT_NATIVE_AUTH_PROVIDER, conf);
- }
-
- public static Optional<String> getInputNativeSSLTruststorePath(Configuration conf)
- {
- return getStringSetting(INPUT_NATIVE_SSL_TRUST_STORE_PATH, conf);
- }
-
- public static Optional<String> getInputNativeSSLKeystorePath(Configuration conf)
- {
- return getStringSetting(INPUT_NATIVE_SSL_KEY_STORE_PATH, conf);
- }
-
- public static Optional<String> getInputNativeSSLKeystorePassword(Configuration conf)
- {
- return getStringSetting(INPUT_NATIVE_SSL_KEY_STORE_PASSWARD, conf);
- }
-
- public static Optional<String> getInputNativeSSLTruststorePassword(Configuration conf)
- {
- return getStringSetting(INPUT_NATIVE_SSL_TRUST_STORE_PASSWARD, conf);
- }
-
- public static Optional<String> getInputNativeSSLCipherSuites(Configuration conf)
- {
- return getStringSetting(INPUT_NATIVE_SSL_CIPHER_SUITES, conf);
- }
-
- public static Optional<Boolean> getInputNativeKeepAlive(Configuration conf)
- {
- return getBooleanSetting(INPUT_NATIVE_KEEP_ALIVE, conf);
- }
-
+
+
public static String getInputcolumns(Configuration conf)
{
return conf.get(INPUT_CQL_COLUMNS_CONFIG);
}
-
- public static Optional<Integer> getInputPageRowSize(Configuration conf)
+
+ public static String getInputPageRowSize(Configuration conf)
{
- return getIntSetting(INPUT_CQL_PAGE_ROW_SIZE_CONFIG, conf);
+ return conf.get(INPUT_CQL_PAGE_ROW_SIZE_CONFIG);
}
-
+
public static String getInputWhereClauses(Configuration conf)
{
return conf.get(INPUT_CQL_WHERE_CLAUSE_CONFIG);
}
-
- public static String getInputCql(Configuration conf)
- {
- return conf.get(INPUT_CQL);
- }
-
+
public static String getOutputCql(Configuration conf)
{
return conf.get(OUTPUT_CQL);
}
-
- public static Cluster getInputCluster(String host, Configuration conf)
- {
- int port = getInputNativePort(conf);
- Optional<AuthProvider> authProvider = getAuthProvider(conf);
- Optional<SSLOptions> sslOptions = getSSLOptions(conf);
- LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, host);
- SocketOptions socketOptions = getReadSocketOptions(conf);
- QueryOptions queryOptions = getReadQueryOptions(conf);
- PoolingOptions poolingOptions = getReadPoolingOptions(conf);
-
- Cluster.Builder builder = Cluster.builder()
- .addContactPoint(host)
- .withPort(port)
- .withCompression(ProtocolOptions.Compression.NONE);
-
- if (authProvider.isPresent())
- builder.withAuthProvider(authProvider.get());
- if (sslOptions.isPresent())
- builder.withSSL(sslOptions.get());
-
- builder.withLoadBalancingPolicy(loadBalancingPolicy)
- .withSocketOptions(socketOptions)
- .withQueryOptions(queryOptions)
- .withPoolingOptions(poolingOptions);
-
- return builder.build();
- }
-
- public static void setInputCoreConnections(Configuration conf, String connections)
- {
- conf.set(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, connections);
- }
-
- public static void setInputMaxConnections(Configuration conf, String connections)
- {
- conf.set(INPUT_NATIVE_MAX_CONNECTIONS_PER_HOST, connections);
- }
-
- public static void setInputMinSimultReqPerConnections(Configuration conf, String reqs)
- {
- conf.set(INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION, reqs);
- }
-
- public static void setInputMaxSimultReqPerConnections(Configuration conf, String reqs)
- {
- conf.set(INPUT_NATIVE_MAX_SIMULT_REQ_PER_CONNECTION, reqs);
- }
-
- public static void setInputNativeConnectionTimeout(Configuration conf, String timeout)
- {
- conf.set(INPUT_NATIVE_CONNECTION_TIMEOUT, timeout);
- }
-
- public static void setInputNativeReadConnectionTimeout(Configuration conf, String timeout)
- {
- conf.set(INPUT_NATIVE_READ_CONNECTION_TIMEOUT, timeout);
- }
-
- public static void setInputNativeReceiveBufferSize(Configuration conf, String size)
- {
- conf.set(INPUT_NATIVE_RECEIVE_BUFFER_SIZE, size);
- }
-
- public static void setInputNativeSendBufferSize(Configuration conf, String size)
- {
- conf.set(INPUT_NATIVE_SEND_BUFFER_SIZE, size);
- }
-
- public static void setInputNativeSolinger(Configuration conf, String solinger)
- {
- conf.set(INPUT_NATIVE_SOLINGER, solinger);
- }
-
- public static void setInputNativeTcpNodelay(Configuration conf, String tcpNodelay)
- {
- conf.set(INPUT_NATIVE_TCP_NODELAY, tcpNodelay);
- }
-
- public static void setInputNativeAuthProvider(Configuration conf, String authProvider)
- {
- conf.set(INPUT_NATIVE_AUTH_PROVIDER, authProvider);
- }
-
- public static void setInputNativeSSLTruststorePath(Configuration conf, String authProvider)
- {
- conf.set(INPUT_NATIVE_SSL_TRUST_STORE_PATH, authProvider);
- }
-
- public static void setInputNativeSSLKeystorePath(Configuration conf, String authProvider)
- {
- conf.set(INPUT_NATIVE_SSL_KEY_STORE_PATH, authProvider);
- }
-
- public static void setInputNativeSSLKeystorePassword(Configuration conf, String authProvider)
- {
- conf.set(INPUT_NATIVE_SSL_KEY_STORE_PASSWARD, authProvider);
- }
-
- public static void setInputNativeSSLTruststorePassword(Configuration conf, String authProvider)
- {
- conf.set(INPUT_NATIVE_SSL_TRUST_STORE_PASSWARD, authProvider);
- }
-
- public static void setInputNativeSSLCipherSuites(Configuration conf, String authProvider)
- {
- conf.set(INPUT_NATIVE_SSL_CIPHER_SUITES, authProvider);
- }
-
- public static void setInputNativeReuseAddress(Configuration conf, String reuseAddress)
- {
- conf.set(INPUT_NATIVE_REUSE_ADDRESS, reuseAddress);
- }
-
- public static void setInputNativeKeepAlive(Configuration conf, String keepAlive)
- {
- conf.set(INPUT_NATIVE_KEEP_ALIVE, keepAlive);
- }
-
- public static void setInputNativePort(Configuration conf, String port)
- {
- conf.set(INPUT_NATIVE_PORT, port);
- }
-
- private static PoolingOptions getReadPoolingOptions(Configuration conf)
- {
- Optional<Integer> coreConnections = getInputCoreConnections(conf);
- Optional<Integer> maxConnections = getInputMaxConnections(conf);
- Optional<Integer> maxSimultaneousRequests = getInputMaxSimultReqPerConnections(conf);
- Optional<Integer> minSimultaneousRequests = getInputMinSimultReqPerConnections(conf);
-
- PoolingOptions poolingOptions = new PoolingOptions();
-
- if (coreConnections.isPresent())
- poolingOptions.setCoreConnectionsPerHost(HostDistance.LOCAL, coreConnections.get());
- if (maxConnections.isPresent())
- poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, maxConnections.get());
- if (maxSimultaneousRequests.isPresent())
- poolingOptions.setMaxSimultaneousRequestsPerConnectionThreshold(HostDistance.LOCAL, maxSimultaneousRequests.get());
- if (minSimultaneousRequests.isPresent())
- poolingOptions.setMinSimultaneousRequestsPerConnectionThreshold(HostDistance.LOCAL, minSimultaneousRequests.get());
-
- poolingOptions.setCoreConnectionsPerHost(HostDistance.REMOTE, 0)
- .setMaxConnectionsPerHost(HostDistance.REMOTE, 0)
- .setMaxSimultaneousRequestsPerConnectionThreshold(HostDistance.REMOTE, 0)
- .setMinSimultaneousRequestsPerConnectionThreshold(HostDistance.REMOTE, 0);
-
- return poolingOptions;
- }
-
- private static QueryOptions getReadQueryOptions(Configuration conf)
- {
- String CL = ConfigHelper.getReadConsistencyLevel(conf);
- Optional<Integer> fetchSize = getInputPageRowSize(conf);
- QueryOptions queryOptions = new QueryOptions();
- if (CL != null && !CL.isEmpty())
- queryOptions.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.valueOf(CL));
-
- if (fetchSize.isPresent())
- queryOptions.setFetchSize(fetchSize.get());
- return queryOptions;
- }
-
- private static SocketOptions getReadSocketOptions(Configuration conf)
- {
- SocketOptions socketOptions = new SocketOptions();
- Optional<Integer> connectTimeoutMillis = getInputNativeConnectionTimeout(conf);
- Optional<Integer> readTimeoutMillis = getInputNativeReadConnectionTimeout(conf);
- Optional<Integer> receiveBufferSize = getInputNativeReceiveBufferSize(conf);
- Optional<Integer> sendBufferSize = getInputNativeSendBufferSize(conf);
- Optional<Integer> soLinger = getInputNativeSolinger(conf);
- Optional<Boolean> tcpNoDelay = getInputNativeTcpNodelay(conf);
- Optional<Boolean> reuseAddress = getInputNativeReuseAddress(conf);
- Optional<Boolean> keepAlive = getInputNativeKeepAlive(conf);
-
- if (connectTimeoutMillis.isPresent())
- socketOptions.setConnectTimeoutMillis(connectTimeoutMillis.get());
- if (readTimeoutMillis.isPresent())
- socketOptions.setReadTimeoutMillis(readTimeoutMillis.get());
- if (receiveBufferSize.isPresent())
- socketOptions.setReceiveBufferSize(receiveBufferSize.get());
- if (sendBufferSize.isPresent())
- socketOptions.setSendBufferSize(sendBufferSize.get());
- if (soLinger.isPresent())
- socketOptions.setSoLinger(soLinger.get());
- if (tcpNoDelay.isPresent())
- socketOptions.setTcpNoDelay(tcpNoDelay.get());
- if (reuseAddress.isPresent())
- socketOptions.setReuseAddress(reuseAddress.get());
- if (keepAlive.isPresent())
- socketOptions.setKeepAlive(keepAlive.get());
-
- return socketOptions;
- }
-
- private static LoadBalancingPolicy getReadLoadBalancingPolicy(Configuration conf, final String stickHost)
- {
- return new LoadBalancingPolicy()
- {
- private Host origHost;
- private Set<Host> liveRemoteHosts = Sets.newHashSet();
-
- @Override
- public void onAdd(Host host)
- {
- if (host.getAddress().getHostName().equals(stickHost))
- origHost = host;
- }
-
- @Override
- public void onDown(Host host)
- {
- if (host.getAddress().getHostName().equals(stickHost))
- origHost = null;
- liveRemoteHosts.remove(host);
- }
-
- @Override
- public void onRemove(Host host)
- {
- if (host.getAddress().getHostName().equals(stickHost))
- origHost = null;
- liveRemoteHosts.remove(host);
- }
-
- @Override
- public void onUp(Host host)
- {
- if (host.getAddress().getHostName().equals(stickHost))
- origHost = host;
- liveRemoteHosts.add(host);
- }
-
- @Override
- public HostDistance distance(Host host)
- {
- if (host.getAddress().getHostName().equals(stickHost))
- return HostDistance.LOCAL;
- else
- return HostDistance.REMOTE;
- }
-
- @Override
- public void init(Cluster cluster, Collection<Host> hosts)
- {
- for (Host host : hosts)
- {
- if (host.getAddress().getHostName().equals(stickHost))
- {
- origHost = host;
- break;
- }
- }
- }
-
- @Override
- public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement)
- {
- if (origHost != null)
- {
- return Iterators.concat(Collections.singletonList(origHost).iterator(), liveRemoteHosts.iterator());
- }
- else
- {
- return liveRemoteHosts.iterator();
- }
- }
- };
- }
-
- private static Optional<AuthProvider> getAuthProvider(Configuration conf)
- {
- Optional<String> authProvider = getInputNativeAuthProvider(conf);
- if (!authProvider.isPresent())
- return Optional.absent();
-
- return Optional.of(getClientAuthProvider(authProvider.get()));
- }
-
- private static Optional<SSLOptions> getSSLOptions(Configuration conf)
- {
- Optional<String> truststorePath = getInputNativeSSLTruststorePath(conf);
- Optional<String> keystorePath = getInputNativeSSLKeystorePath(conf);
- Optional<String> truststorePassword = getInputNativeSSLTruststorePassword(conf);
- Optional<String> keystorePassword = getInputNativeSSLKeystorePassword(conf);
- Optional<String> cipherSuites = getInputNativeSSLCipherSuites(conf);
-
- if (truststorePath.isPresent() && keystorePath.isPresent() && truststorePassword.isPresent() && keystorePassword.isPresent())
- {
- SSLContext context;
- try
- {
- context = getSSLContext(truststorePath.get(), truststorePassword.get(), keystorePath.get(), keystorePassword.get());
- }
- catch (UnrecoverableKeyException | KeyManagementException |
- NoSuchAlgorithmException | KeyStoreException | CertificateException | IOException e)
- {
- throw new RuntimeException(e);
- }
- String[] css = SSLOptions.DEFAULT_SSL_CIPHER_SUITES;
- if (cipherSuites.isPresent())
- css = cipherSuites.get().split(",");
- return Optional.of(new SSLOptions(context,css));
- }
- return Optional.absent();
- }
-
- private static Optional<Integer> getIntSetting(String parameter, Configuration conf)
- {
- String setting = conf.get(parameter);
- if (setting == null)
- return Optional.absent();
- return Optional.of(Integer.parseInt(setting));
- }
-
- private static Optional<Boolean> getBooleanSetting(String parameter, Configuration conf)
- {
- String setting = conf.get(parameter);
- if (setting == null)
- return Optional.absent();
- return Optional.of(Boolean.parseBoolean(setting));
- }
-
- private static Optional<String> getStringSetting(String parameter, Configuration conf)
- {
- String setting = conf.get(parameter);
- if (setting == null)
- return Optional.absent();
- return Optional.of(setting);
- }
-
- private static AuthProvider getClientAuthProvider(String factoryClassName)
- {
- try
- {
- return (AuthProvider) Class.forName(factoryClassName).newInstance();
- }
- catch (Exception e)
- {
- throw new RuntimeException("Failed to instantiate auth provider:" + factoryClassName, e);
- }
- }
-
- private static SSLContext getSSLContext(String truststorePath, String truststorePassword, String keystorePath, String keystorePassword)
- throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException, UnrecoverableKeyException, KeyManagementException
- {
-
- FileInputStream tsf = new FileInputStream(truststorePath);
- FileInputStream ksf = new FileInputStream(keystorePath);
- SSLContext ctx = SSLContext.getInstance("SSL");
-
- KeyStore ts = KeyStore.getInstance("JKS");
- ts.load(tsf, truststorePassword.toCharArray());
- TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
- tmf.init(ts);
-
- KeyStore ks = KeyStore.getInstance("JKS");
- ks.load(ksf, keystorePassword.toCharArray());
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- kmf.init(ks, keystorePassword.toCharArray());
-
- ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());
- return ctx;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
deleted file mode 100644
index e1cdf32..0000000
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop.cql3;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-
-import com.datastax.driver.core.Row;
-
-/**
- * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
- *
- * At minimum, you need to set the KS and CF in your Hadoop job Configuration.
- * The ConfigHelper class is provided to make this
- * simple:
- * ConfigHelper.setInputColumnFamily
- *
- * You can also configure the number of rows per InputSplit with
- * ConfigHelper.setInputSplitSize. The default split size is 64k rows.
- *
- * the number of CQL rows per page
- * CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You
- * should set it to "as big as possible, but no bigger." It set the LIMIT for the CQL
- * query, so you need set it big enough to minimize the network overhead, and also
- * not too big to avoid out of memory issue.
- *
- * other native protocol connection parameters in CqlConfigHelper
- */
-public class CqlInputFormat extends AbstractColumnFamilyInputFormat<Long, Row>
-{
- public RecordReader<Long, Row> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter)
- throws IOException
- {
- TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)))
- {
- @Override
- public void progress()
- {
- reporter.progress();
- }
- };
-
- CqlRecordReader recordReader = new CqlRecordReader();
- recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac);
- return recordReader;
- }
-
- @Override
- public org.apache.hadoop.mapreduce.RecordReader<Long, Row> createRecordReader(
- org.apache.hadoop.mapreduce.InputSplit arg0, TaskAttemptContext arg1) throws IOException,
- InterruptedException
- {
- return new CqlRecordReader();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
index b692280..cee4b4b 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
@@ -117,7 +117,7 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>,
try
{
- pageRowSize = CqlConfigHelper.getInputPageRowSize(conf).get();
+ pageRowSize = Integer.parseInt(CqlConfigHelper.getInputPageRowSize(conf));
}
catch (NumberFormatException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
deleted file mode 100644
index a19cf70..0000000
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop.cql3;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Maps;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.hadoop.ColumnFamilySplit;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.utils.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ColumnMetadata;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-/**
- * CqlRecordReader reads the rows return from the CQL query
- * It uses CQL auto-paging.
- * <p/>
- * Return a Long as a local CQL row key starts from 0;
- * <p/>
- * Row as C* java driver CQL result set row
- * 1) select clause must include partition key columns (to calculate the progress based on the actual CF row processed)
- * 2) where clause must include token(partition_key1, ... , partition_keyn) > ? and
- * token(partition_key1, ... , partition_keyn) <= ? (in the right order)
- */
-public class CqlRecordReader extends RecordReader<Long, Row>
- implements org.apache.hadoop.mapred.RecordReader<Long, Row>
-{
- private static final Logger logger = LoggerFactory.getLogger(CqlRecordReader.class);
-
- private ColumnFamilySplit split;
- private RowIterator rowIterator;
-
- private Pair<Long, Row> currentRow;
- private int totalRowCount; // total number of rows to fetch
- private String keyspace;
- private String cfName;
- private String cqlQuery;
- private Cluster cluster;
- private Session session;
- private IPartitioner partitioner;
-
- // partition keys -- key aliases
- private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap();
-
- public CqlRecordReader()
- {
- super();
- }
-
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
- {
- this.split = (ColumnFamilySplit) split;
- Configuration conf = context.getConfiguration();
- totalRowCount = (this.split.getLength() < Long.MAX_VALUE)
- ? (int) this.split.getLength()
- : ConfigHelper.getInputSplitSize(conf);
- cfName = ConfigHelper.getInputColumnFamily(conf);
- keyspace = ConfigHelper.getInputKeyspace(conf);
- cqlQuery = CqlConfigHelper.getInputCql(conf);
- partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration());
- try
- {
- if (cluster != null)
- return;
-
- // create connection using thrift
- String[] locations = split.getLocations();
- Exception lastException = null;
- for (String location : locations)
- {
- try
- {
- cluster = CqlConfigHelper.getInputCluster(location, conf);
- break;
- }
- catch (Exception e)
- {
- lastException = e;
- logger.warn("Failed to create authenticated client to {}", location);
- }
- }
- if (cluster == null && lastException != null)
- throw lastException;
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
-
- session = cluster.connect(keyspace);
- rowIterator = new RowIterator();
- logger.debug("created {}", rowIterator);
- }
-
- public void close()
- {
- if (session != null)
- session.close();
- }
-
- public Long getCurrentKey()
- {
- return currentRow.left;
- }
-
- public Row getCurrentValue()
- {
- return currentRow.right;
- }
-
- public float getProgress()
- {
- if (!rowIterator.hasNext())
- return 1.0F;
-
- // the progress is likely to be reported slightly off the actual but close enough
- float progress = ((float) rowIterator.totalRead / totalRowCount);
- return progress > 1.0F ? 1.0F : progress;
- }
-
- public boolean nextKeyValue() throws IOException
- {
- if (!rowIterator.hasNext())
- {
- logger.debug("Finished scanning {} rows (estimate was: {})", rowIterator.totalRead, totalRowCount);
- return false;
- }
-
- try
- {
- currentRow = rowIterator.next();
- }
- catch (Exception e)
- {
- // throw it as IOException, so client can catch it and handle it at client side
- IOException ioe = new IOException(e.getMessage());
- ioe.initCause(ioe.getCause());
- throw ioe;
- }
- return true;
- }
-
- // Because the old Hadoop API wants us to write to the key and value
- // and the new asks for them, we need to copy the output of the new API
- // to the old. Thus, expect a small performance hit.
- // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat
- // and ColumnFamilyRecordReader don't support them, it should be fine for now.
- public boolean next(Long key, Row value) throws IOException
- {
- if (nextKeyValue())
- {
- key = getCurrentKey();
- value = getCurrentValue();
- return true;
- }
- return false;
- }
-
- public long getPos() throws IOException
- {
- return (long) rowIterator.totalRead;
- }
-
- public Long createKey()
- {
- return null;
- }
-
- public Row createValue()
- {
- return null;
- }
-
- /** CQL row iterator
- * Input cql query
- * 1) select clause must include key columns (if we use partition key based row count)
- * 2) where clause must include token(partition_key1 ... partition_keyn) > ? and
- * token(partition_key1 ... partition_keyn) <= ?
- */
- private class RowIterator extends AbstractIterator<Pair<Long, Row>>
- {
- private long keyId = 0L;
- protected int totalRead = 0; // total number of cf rows read
- protected Iterator<Row> rows;
- private Map<String, ByteBuffer> previousRowKey = new HashMap<String, ByteBuffer>(); // previous CF row key
-
- public RowIterator()
- {
- AbstractType type = partitioner.getTokenValidator();
- ResultSet rs = session.execute(cqlQuery, type.compose(type.fromString(split.getStartToken())), type.compose(type.fromString(split.getEndToken())) );
- for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(keyspace).getTable(cfName).getPartitionKey())
- partitionBoundColumns.put(meta.getName(), Boolean.TRUE);
- rows = rs.iterator();
- }
-
- protected Pair<Long, Row> computeNext()
- {
- if (rows == null || !rows.hasNext())
- return endOfData();
-
- Row row = rows.next();
- Map<String, ByteBuffer> keyColumns = new HashMap<String, ByteBuffer>();
- for (String column : partitionBoundColumns.keySet())
- keyColumns.put(column, row.getBytesUnsafe(column));
-
- // increase total CF row read
- if (previousRowKey.isEmpty() && !keyColumns.isEmpty())
- {
- previousRowKey = keyColumns;
- totalRead++;
- }
- else
- {
- for (String column : partitionBoundColumns.keySet())
- {
- if (BytesType.bytesCompare(keyColumns.get(column), previousRowKey.get(column)) != 0)
- {
- previousRowKey = keyColumns;
- totalRead++;
- break;
- }
- }
- }
- keyId ++;
- return Pair.create(keyId, row);
- }
- }
-}
[2/2] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1
Posted by sl...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
build.xml
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6df31637
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6df31637
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6df31637
Branch: refs/heads/cassandra-2.1
Commit: 6df316375209410fbfe1e40059d25c915fa5fbb5
Parents: 43092aa b0841d8
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Mar 25 16:52:21 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Mar 25 16:52:21 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 -
build.xml | 3 -
examples/hadoop_cql3_word_count/bin/word_count | 3 +-
.../bin/word_count_counters | 4 +-
.../hadoop_cql3_word_count/src/WordCount.java | 77 +--
.../src/WordCountCounters.java | 54 +-
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 541 +------------------
.../cassandra/hadoop/cql3/CqlInputFormat.java | 80 ---
.../hadoop/cql3/CqlPagingRecordReader.java | 2 +-
.../cassandra/hadoop/cql3/CqlRecordReader.java | 260 ---------
10 files changed, 29 insertions(+), 996 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6df31637/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6df31637/build.xml
----------------------------------------------------------------------
diff --cc build.xml
index 6656015,464dece..83ce003
--- a/build.xml
+++ b/build.xml
@@@ -383,12 -377,9 +383,11 @@@
<dependency groupId="org.apache.cassandra" artifactId="cassandra-thrift" version="${version}" />
<dependency groupId="com.yammer.metrics" artifactId="metrics-core" version="2.2.0" />
<dependency groupId="com.addthis.metrics" artifactId="reporter-config" version="2.1.0" />
- <dependency groupId="edu.stanford.ppl" artifactId="snaptree" version="0.1" />
<dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
+ <dependency groupId="io.airlift" artifactId="airline" version="0.6" />
<dependency groupId="io.netty" artifactId="netty" version="3.6.6.Final" />
+ <dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
+ <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
- <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.0.1" />
<dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
</dependencyManagement>
<developer id="alakshman" name="Avinash Lakshman"/>
@@@ -419,8 -410,8 +418,7 @@@
<dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/>
<dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/>
<dependency groupId="org.apache.pig" artifactId="pig"/>
-
- <dependency groupId="net.java.dev.jna" artifactId="jna"/>
+ <dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
- <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core"/>
</artifact:pom>
<artifact:pom id="coverage-deps-pom"
@@@ -486,10 -473,9 +484,9 @@@
<dependency groupId="org.apache.hadoop" artifactId="hadoop-core" optional="true"/>
<dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" optional="true"/>
<dependency groupId="org.apache.pig" artifactId="pig" optional="true"/>
- <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" optional="true"/>
<!-- don't need jna to run, but nice to have -->
- <dependency groupId="net.java.dev.jna" artifactId="jna" optional="true"/>
+ <dependency groupId="net.java.dev.jna" artifactId="jna" version="4.0.0"/>
<!-- don't need jamm unless running a server in which case it needs to be a -javagent to be used anyway -->
<dependency groupId="com.github.stephenc" artifactId="jamm"/>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6df31637/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
----------------------------------------------------------------------