You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/03/25 16:52:31 UTC

[1/2] git commit: Revert "Add CqlRecordReader to take advantage of native CQL pagination"

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 43092aac9 -> 6df316375


Revert "Add CqlRecordReader to take advantage of native CQL pagination"

This reverts commit 3b708f9989274cbe9e0e2a5fda6f1d0a3d96ebee.

Conflicts:

	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b0841d8f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b0841d8f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b0841d8f

Branch: refs/heads/cassandra-2.1
Commit: b0841d8fee4c89647894f056b1106b9ac67be8b9
Parents: e6c8034
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Mar 25 16:49:10 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Mar 25 16:49:10 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 -
 build.xml                                       |   4 +-
 examples/hadoop_cql3_word_count/bin/word_count  |   3 +-
 .../bin/word_count_counters                     |   4 +-
 .../hadoop_cql3_word_count/src/WordCount.java   |  77 +--
 .../src/WordCountCounters.java                  |  54 +-
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  | 541 +------------------
 .../cassandra/hadoop/cql3/CqlInputFormat.java   |  80 ---
 .../hadoop/cql3/CqlPagingRecordReader.java      |   2 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  | 260 ---------
 10 files changed, 30 insertions(+), 996 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ed202cc..226855c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,7 +27,6 @@
  * Improve PerRowSecondaryIndex performance (CASSANDRA-6876)
  * Extend triggers to support CAS updates (CASSANDRA-6882)
  * Static columns with IF NOT EXISTS don't always work as expected (CASSANDRA-6873)
- * Add CqlRecordReader to take advantage of native CQL pagination (CASSANDRA-6311)
  * Fix paging with SELECT DISTINCT (CASSANDRA-6857)
 Merged from 1.2:
  * Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index a15415b..464dece 100644
--- a/build.xml
+++ b/build.xml
@@ -380,7 +380,6 @@
           <dependency groupId="edu.stanford.ppl" artifactId="snaptree" version="0.1" />
           <dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
           <dependency groupId="io.netty" artifactId="netty" version="3.6.6.Final" />
-          <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.0.1" />
           <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
         </dependencyManagement>
         <developer id="alakshman" name="Avinash Lakshman"/>
@@ -411,7 +410,7 @@
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/>
       	<dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/>
         <dependency groupId="org.apache.pig" artifactId="pig"/>
-        <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core"/>
+
         <dependency groupId="net.java.dev.jna" artifactId="jna"/>
       </artifact:pom>
 
@@ -474,7 +473,6 @@
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" optional="true"/>
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" optional="true"/>
         <dependency groupId="org.apache.pig" artifactId="pig" optional="true"/>
-      	<dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" optional="true"/>
 
         <!-- don't need jna to run, but nice to have -->
         <dependency groupId="net.java.dev.jna" artifactId="jna" optional="true"/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/examples/hadoop_cql3_word_count/bin/word_count
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/bin/word_count b/examples/hadoop_cql3_word_count/bin/word_count
index 974a39a..a0c5aa0 100644
--- a/examples/hadoop_cql3_word_count/bin/word_count
+++ b/examples/hadoop_cql3_word_count/bin/word_count
@@ -56,7 +56,6 @@ if [ "x$JAVA" = "x" ]; then
 fi
 
 OUTPUT_REDUCER=cassandra
-INPUT_MAPPER=native
 
 #echo $CLASSPATH
-$JAVA -Xmx1G -ea -cp $CLASSPATH WordCount output_reducer=$OUTPUT_REDUCER input_mapper=$INPUT_MAPPER
+$JAVA -Xmx1G -ea -cp $CLASSPATH WordCount output_reducer=$OUTPUT_REDUCER

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/examples/hadoop_cql3_word_count/bin/word_count_counters
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/bin/word_count_counters b/examples/hadoop_cql3_word_count/bin/word_count_counters
index 0b69b40..7793477 100644
--- a/examples/hadoop_cql3_word_count/bin/word_count_counters
+++ b/examples/hadoop_cql3_word_count/bin/word_count_counters
@@ -54,7 +54,5 @@ if [ "x$JAVA" = "x" ]; then
     exit 1
 fi
 
-INPUT_MAPPER=native
-
 #echo $CLASSPATH
-$JAVA -Xmx1G -ea -cp $CLASSPATH WordCountCounters input_mapper=$INPUT_MAPPER
+$JAVA -Xmx1G -ea -cp $CLASSPATH WordCountCounters

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/examples/hadoop_cql3_word_count/src/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCount.java b/examples/hadoop_cql3_word_count/src/WordCount.java
index 519a98f..bc81a53 100644
--- a/examples/hadoop_cql3_word_count/src/WordCount.java
+++ b/examples/hadoop_cql3_word_count/src/WordCount.java
@@ -27,7 +27,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat;
-import org.apache.cassandra.hadoop.cql3.CqlInputFormat;
 import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.hadoop.conf.Configuration;
@@ -38,11 +37,10 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import com.datastax.driver.core.Row;
+
 import java.nio.charset.CharacterCodingException;
 
 /**
@@ -62,7 +60,7 @@ import java.nio.charset.CharacterCodingException;
 public class WordCount extends Configured implements Tool
 {
     private static final Logger logger = LoggerFactory.getLogger(WordCount.class);
-    static final String INPUT_MAPPER_VAR = "input_mapper";
+
     static final String KEYSPACE = "cql3_worldcount";
     static final String COLUMN_FAMILY = "inputs";
 
@@ -70,6 +68,7 @@ public class WordCount extends Configured implements Tool
     static final String OUTPUT_COLUMN_FAMILY = "output_words";
 
     private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count";
+
     private static final String PRIMARY_KEY = "row_key";
 
     public static void main(String[] args) throws Exception
@@ -109,30 +108,6 @@ public class WordCount extends Configured implements Tool
         }
     }
 
-    public static class NativeTokenizerMapper extends Mapper<Long, Row, Text, IntWritable>
-    {
-        private final static IntWritable one = new IntWritable(1);
-        private Text word = new Text();
-        private ByteBuffer sourceColumn;
-
-        protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
-        throws IOException, InterruptedException
-        {
-        }
-
-        public void map(Long key, Row row, Context context) throws IOException, InterruptedException
-        {
-            String value = row.getString("line");
-            logger.debug("read {}:{}={} from {}", new Object[] {key, "line", value, context.getInputSplit()});
-            StringTokenizer itr = new StringTokenizer(value);
-            while (itr.hasMoreTokens())
-            {
-                word.set(itr.nextToken());
-                context.write(word, one);
-            }
-        }
-    }
-
     public static class ReducerToFilesystem extends Reducer<Text, IntWritable, Text, IntWritable>
     {
         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
@@ -174,41 +149,17 @@ public class WordCount extends Configured implements Tool
     public int run(String[] args) throws Exception
     {
         String outputReducerType = "filesystem";
-        String inputMapperType = "native";
-        String outputReducer = null;
-        String inputMapper = null;
-
-        if (args != null)
+        if (args != null && args[0].startsWith(OUTPUT_REDUCER_VAR))
         {
-            if(args[0].startsWith(OUTPUT_REDUCER_VAR))
-                outputReducer = args[0];
-            if(args[0].startsWith(INPUT_MAPPER_VAR))
-                inputMapper = args[0];
-            
-            if (args.length == 2)
-            {
-                if(args[1].startsWith(OUTPUT_REDUCER_VAR))
-                    outputReducer = args[1];
-                if(args[1].startsWith(INPUT_MAPPER_VAR))
-                    inputMapper = args[1]; 
-            }
-        }
-
-        if (outputReducer != null)
-        {
-            String[] s = outputReducer.split("=");
+            String[] s = args[0].split("=");
             if (s != null && s.length == 2)
                 outputReducerType = s[1];
         }
         logger.info("output reducer type: " + outputReducerType);
-        if (inputMapper != null)
-        {
-            String[] s = inputMapper.split("=");
-            if (s != null && s.length == 2)
-                inputMapperType = s[1];
-        }
+
         Job job = new Job(getConf(), "wordcount");
         job.setJarByClass(WordCount.class);
+        job.setMapperClass(TokenizerMapper.class);
 
         if (outputReducerType.equalsIgnoreCase("filesystem"))
         {
@@ -238,19 +189,9 @@ public class WordCount extends Configured implements Tool
             ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
         }
 
-        if (inputMapperType.equalsIgnoreCase("native"))
-        {
-            job.setMapperClass(NativeTokenizerMapper.class);
-            job.setInputFormatClass(CqlInputFormat.class);
-            CqlConfigHelper.setInputCql(job.getConfiguration(), "select * from " + COLUMN_FAMILY + " where token(id) > ? and token(id) <= ? allow filtering");
-        }
-        else
-        {
-            job.setMapperClass(TokenizerMapper.class);
-            job.setInputFormatClass(CqlPagingInputFormat.class);
-            ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
-        }
+        job.setInputFormatClass(CqlPagingInputFormat.class);
 
+        ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
         ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
         ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
         ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/examples/hadoop_cql3_word_count/src/WordCountCounters.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCountCounters.java b/examples/hadoop_cql3_word_count/src/WordCountCounters.java
index 74de9ab..542a473 100644
--- a/examples/hadoop_cql3_word_count/src/WordCountCounters.java
+++ b/examples/hadoop_cql3_word_count/src/WordCountCounters.java
@@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
 import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat;
-import org.apache.cassandra.hadoop.cql3.CqlInputFormat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
@@ -38,7 +37,7 @@ import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import com.datastax.driver.core.Row;
+
 import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -52,7 +51,6 @@ public class WordCountCounters extends Configured implements Tool
 {
     private static final Logger logger = LoggerFactory.getLogger(WordCountCounters.class);
 
-    static final String INPUT_MAPPER_VAR = "input_mapper";
     static final String COUNTER_COLUMN_FAMILY = "input_words_count";
     private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count_counters";
 
@@ -63,24 +61,6 @@ public class WordCountCounters extends Configured implements Tool
         System.exit(0);
     }
 
-    public static class SumNativeMapper extends Mapper<Long, Row, Text, LongWritable>
-    {
-        long sum = -1;
-        public void map(Long key, Row row, Context context) throws IOException, InterruptedException
-        {   
-            if (sum < 0)
-                sum = 0;
-
-            logger.debug("read " + key + ":count_num from " + context.getInputSplit());
-            sum += Long.valueOf(row.getString("count_num"));
-        }
-
-        protected void cleanup(Context context) throws IOException, InterruptedException {
-            if (sum > 0)
-                context.write(new Text("total_count"), new LongWritable(sum));
-        }
-    }
-
     public static class SumMapper extends Mapper<Map<String, ByteBuffer>, Map<String, ByteBuffer>, Text, LongWritable>
     {
         long sum = -1;
@@ -115,6 +95,7 @@ public class WordCountCounters extends Configured implements Tool
         }
     }
 
+    
     public static class ReducerToFilesystem extends Reducer<Text, LongWritable, Text, LongWritable>
     {
         long sum = 0;
@@ -129,40 +110,25 @@ public class WordCountCounters extends Configured implements Tool
 
     public int run(String[] args) throws Exception
     {
-        String inputMapperType = "native";
-        if (args != null && args[0].startsWith(INPUT_MAPPER_VAR))
-        {
-            String[] arg0 = args[0].split("=");
-            if (arg0 != null && arg0.length == 2)
-                inputMapperType = arg0[1];
-        }
         Job job = new Job(getConf(), "wordcountcounters");
+        job.setJarByClass(WordCountCounters.class);
+        job.setMapperClass(SumMapper.class);
 
         job.setCombinerClass(ReducerToFilesystem.class);
         job.setReducerClass(ReducerToFilesystem.class);
-        job.setJarByClass(WordCountCounters.class); 
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(LongWritable.class);
+        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX));
+
+        job.setInputFormatClass(CqlPagingInputFormat.class);
 
+        ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
         ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
         ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
         ConfigHelper.setInputColumnFamily(job.getConfiguration(), WordCount.KEYSPACE, WordCount.OUTPUT_COLUMN_FAMILY);
 
         CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
-        if ("native".equals(inputMapperType))
-        {
-            job.setMapperClass(SumNativeMapper.class);
-            job.setInputFormatClass(CqlInputFormat.class);
-            CqlConfigHelper.setInputCql(job.getConfiguration(), "select * from " + WordCount.OUTPUT_COLUMN_FAMILY + " where token(word) > ? and token(word) <= ? allow filtering");
-        }
-        else
-        {
-            job.setMapperClass(SumMapper.class);
-            job.setInputFormatClass(CqlPagingInputFormat.class);
-            ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
-        }
 
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(LongWritable.class);
-        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX));
         job.waitForCompletion(true);
         return 0;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 3672c84..cb61d05 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -19,69 +19,13 @@ package org.apache.cassandra.hadoop.cql3;
 * under the License.
 *
 */
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.security.KeyManagementException;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Set;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManagerFactory;
-
-import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.hadoop.conf.Configuration;
 
-import com.datastax.driver.core.AuthProvider;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Host;
-import com.datastax.driver.core.HostDistance;
-import com.datastax.driver.core.PoolingOptions;
-import com.datastax.driver.core.ProtocolOptions;
-import com.datastax.driver.core.QueryOptions;
-import com.datastax.driver.core.SSLOptions;
-import com.datastax.driver.core.SocketOptions;
-import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.policies.LoadBalancingPolicy;
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Sets;
-
 public class CqlConfigHelper
 {
     private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon ,
     private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size";
     private static final String INPUT_CQL_WHERE_CLAUSE_CONFIG = "cassandra.input.where.clause";
-    private static final String INPUT_CQL = "cassandra.input.cql";
-
-    private static final String INPUT_NATIVE_PORT = "cassandra.input.native.port";
-    private static final String INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST = "cassandra.input.native.core.connections.per.host";
-    private static final String INPUT_NATIVE_MAX_CONNECTIONS_PER_HOST = "cassandra.input.native.max.connections.per.host";
-    private static final String INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION = "cassandra.input.native.min.simult.reqs.per.connection"; 
-    private static final String INPUT_NATIVE_MAX_SIMULT_REQ_PER_CONNECTION = "cassandra.input.native.max.simult.reqs.per.connection";
-    private static final String INPUT_NATIVE_CONNECTION_TIMEOUT = "cassandra.input.native.connection.timeout";
-    private static final String INPUT_NATIVE_READ_CONNECTION_TIMEOUT = "cassandra.input.native.read.connection.timeout";
-    private static final String INPUT_NATIVE_RECEIVE_BUFFER_SIZE = "cassandra.input.native.receive.buffer.size";
-    private static final String INPUT_NATIVE_SEND_BUFFER_SIZE = "cassandra.input.native.send.buffer.size";
-    private static final String INPUT_NATIVE_SOLINGER = "cassandra.input.native.solinger";
-    private static final String INPUT_NATIVE_TCP_NODELAY = "cassandra.input.native.tcp.nodelay";
-    private static final String INPUT_NATIVE_REUSE_ADDRESS = "cassandra.input.native.reuse.address";
-    private static final String INPUT_NATIVE_KEEP_ALIVE = "cassandra.input.native.keep.alive";
-    private static final String INPUT_NATIVE_AUTH_PROVIDER = "cassandra.input.native.auth.provider";
-    private static final String INPUT_NATIVE_SSL_TRUST_STORE_PATH = "cassandra.input.native.ssl.trust.store.path";
-    private static final String INPUT_NATIVE_SSL_KEY_STORE_PATH = "cassandra.input.native.ssl.key.store.path";
-    private static final String INPUT_NATIVE_SSL_TRUST_STORE_PASSWARD = "cassandra.input.native.ssl.trust.store.password";
-    private static final String INPUT_NATIVE_SSL_KEY_STORE_PASSWARD = "cassandra.input.native.ssl.key.store.password";
-    private static final String INPUT_NATIVE_SSL_CIPHER_SUITES = "cassandra.input.native.ssl.cipher.suites";
-
     private static final String OUTPUT_CQL = "cassandra.output.cql";
 
     /**
@@ -141,496 +85,25 @@ public class CqlConfigHelper
         
         conf.set(OUTPUT_CQL, cql);
     }
-
-    public static void setInputCql(Configuration conf, String cql)
-    {
-        if (cql == null || cql.isEmpty())
-            return;
-
-        conf.set(INPUT_CQL, cql);
-    }
-
-    public static Optional<Integer> getInputCoreConnections(Configuration conf)
-    {
-        return getIntSetting(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, conf);
-    }
-
-    public static Optional<Integer> getInputMaxConnections(Configuration conf)
-    {
-        return getIntSetting(INPUT_NATIVE_MAX_CONNECTIONS_PER_HOST, conf);
-    }
-
-    public static int getInputNativePort(Configuration conf)
-    {
-        return Integer.parseInt(conf.get(INPUT_NATIVE_PORT, "9042"));
-    }
-
-    public static Optional<Integer> getInputMinSimultReqPerConnections(Configuration conf)
-    {
-        return getIntSetting(INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION, conf);
-    }
-
-    public static Optional<Integer> getInputMaxSimultReqPerConnections(Configuration conf)
-    {
-        return getIntSetting(INPUT_NATIVE_MAX_SIMULT_REQ_PER_CONNECTION, conf);
-    }
-
-    public static Optional<Integer> getInputNativeConnectionTimeout(Configuration conf)
-    {
-        return getIntSetting(INPUT_NATIVE_CONNECTION_TIMEOUT, conf);
-    }
-
-    public static Optional<Integer> getInputNativeReadConnectionTimeout(Configuration conf)
-    {
-        return getIntSetting(INPUT_NATIVE_READ_CONNECTION_TIMEOUT, conf);
-    }
-
-    public static Optional<Integer> getInputNativeReceiveBufferSize(Configuration conf)
-    {
-        return getIntSetting(INPUT_NATIVE_RECEIVE_BUFFER_SIZE, conf);
-    }
-
-    public static Optional<Integer> getInputNativeSendBufferSize(Configuration conf)
-    {
-        return getIntSetting(INPUT_NATIVE_SEND_BUFFER_SIZE, conf);
-    }
-
-    public static Optional<Integer> getInputNativeSolinger(Configuration conf)
-    {
-        return getIntSetting(INPUT_NATIVE_SOLINGER, conf);
-    }
-
-    public static Optional<Boolean> getInputNativeTcpNodelay(Configuration conf)
-    {
-        return getBooleanSetting(INPUT_NATIVE_TCP_NODELAY, conf);
-    }
-
-    public static Optional<Boolean> getInputNativeReuseAddress(Configuration conf)
-    {
-        return getBooleanSetting(INPUT_NATIVE_REUSE_ADDRESS, conf);
-    }
-
-    public static Optional<String> getInputNativeAuthProvider(Configuration conf)
-    {
-        return getStringSetting(INPUT_NATIVE_AUTH_PROVIDER, conf);
-    }
-
-    public static Optional<String> getInputNativeSSLTruststorePath(Configuration conf)
-    {
-        return getStringSetting(INPUT_NATIVE_SSL_TRUST_STORE_PATH, conf);
-    }
-
-    public static Optional<String> getInputNativeSSLKeystorePath(Configuration conf)
-    {
-        return getStringSetting(INPUT_NATIVE_SSL_KEY_STORE_PATH, conf);
-    }
-
-    public static Optional<String> getInputNativeSSLKeystorePassword(Configuration conf)
-    {
-        return getStringSetting(INPUT_NATIVE_SSL_KEY_STORE_PASSWARD, conf);
-    }
-
-    public static Optional<String> getInputNativeSSLTruststorePassword(Configuration conf)
-    {
-        return getStringSetting(INPUT_NATIVE_SSL_TRUST_STORE_PASSWARD, conf);
-    }
-
-    public static Optional<String> getInputNativeSSLCipherSuites(Configuration conf)
-    {
-        return getStringSetting(INPUT_NATIVE_SSL_CIPHER_SUITES, conf);
-    }
-
-    public static Optional<Boolean> getInputNativeKeepAlive(Configuration conf)
-    {
-        return getBooleanSetting(INPUT_NATIVE_KEEP_ALIVE, conf);
-    }
-
+    
+    
     public static String getInputcolumns(Configuration conf)
     {
         return conf.get(INPUT_CQL_COLUMNS_CONFIG);
     }
-
-    public static Optional<Integer> getInputPageRowSize(Configuration conf)
+    
+    public static String getInputPageRowSize(Configuration conf)
     {
-        return getIntSetting(INPUT_CQL_PAGE_ROW_SIZE_CONFIG, conf);
+        return conf.get(INPUT_CQL_PAGE_ROW_SIZE_CONFIG);
     }
-
+    
     public static String getInputWhereClauses(Configuration conf)
     {
         return conf.get(INPUT_CQL_WHERE_CLAUSE_CONFIG);
     }
-
-    public static String getInputCql(Configuration conf)
-    {
-        return conf.get(INPUT_CQL);
-    }
-
+    
     public static String getOutputCql(Configuration conf)
     {
         return conf.get(OUTPUT_CQL);
     }
-
-    public static Cluster getInputCluster(String host, Configuration conf)
-    {
-        int port = getInputNativePort(conf);
-        Optional<AuthProvider> authProvider = getAuthProvider(conf);
-        Optional<SSLOptions> sslOptions = getSSLOptions(conf);
-        LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, host);
-        SocketOptions socketOptions = getReadSocketOptions(conf);
-        QueryOptions queryOptions = getReadQueryOptions(conf);
-        PoolingOptions poolingOptions = getReadPoolingOptions(conf);
-        
-        Cluster.Builder builder = Cluster.builder()
-                                         .addContactPoint(host)
-                                         .withPort(port)
-                                         .withCompression(ProtocolOptions.Compression.NONE);
-
-        if (authProvider.isPresent())
-            builder.withAuthProvider(authProvider.get());
-        if (sslOptions.isPresent())
-            builder.withSSL(sslOptions.get());
-
-        builder.withLoadBalancingPolicy(loadBalancingPolicy)
-               .withSocketOptions(socketOptions)
-               .withQueryOptions(queryOptions)
-               .withPoolingOptions(poolingOptions);
-
-        return builder.build();
-    }
-
-    public static void setInputCoreConnections(Configuration conf, String connections)
-    {
-        conf.set(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, connections);
-    }
-
-    public static void setInputMaxConnections(Configuration conf, String connections)
-    {
-        conf.set(INPUT_NATIVE_MAX_CONNECTIONS_PER_HOST, connections);
-    }
-
-    public static void setInputMinSimultReqPerConnections(Configuration conf, String reqs)
-    {
-        conf.set(INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION, reqs);
-    }
-
-    public static void setInputMaxSimultReqPerConnections(Configuration conf, String reqs)
-    {
-        conf.set(INPUT_NATIVE_MAX_SIMULT_REQ_PER_CONNECTION, reqs);
-    }    
-
-    public static void setInputNativeConnectionTimeout(Configuration conf, String timeout)
-    {
-        conf.set(INPUT_NATIVE_CONNECTION_TIMEOUT, timeout);
-    }
-
-    public static void setInputNativeReadConnectionTimeout(Configuration conf, String timeout)
-    {
-        conf.set(INPUT_NATIVE_READ_CONNECTION_TIMEOUT, timeout);
-    }
-
-    public static void setInputNativeReceiveBufferSize(Configuration conf, String size)
-    {
-        conf.set(INPUT_NATIVE_RECEIVE_BUFFER_SIZE, size);
-    }
-
-    public static void setInputNativeSendBufferSize(Configuration conf, String size)
-    {
-        conf.set(INPUT_NATIVE_SEND_BUFFER_SIZE, size);
-    }
-
-    public static void setInputNativeSolinger(Configuration conf, String solinger)
-    {
-        conf.set(INPUT_NATIVE_SOLINGER, solinger);
-    }
-
-    public static void setInputNativeTcpNodelay(Configuration conf, String tcpNodelay)
-    {
-        conf.set(INPUT_NATIVE_TCP_NODELAY, tcpNodelay);
-    }
-
-    public static void setInputNativeAuthProvider(Configuration conf, String authProvider)
-    {
-        conf.set(INPUT_NATIVE_AUTH_PROVIDER, authProvider);
-    }
-
-    public static void setInputNativeSSLTruststorePath(Configuration conf, String authProvider)
-    {
-        conf.set(INPUT_NATIVE_SSL_TRUST_STORE_PATH, authProvider);
-    } 
-
-    public static void setInputNativeSSLKeystorePath(Configuration conf, String authProvider)
-    {
-        conf.set(INPUT_NATIVE_SSL_KEY_STORE_PATH, authProvider);
-    }
-
-    public static void setInputNativeSSLKeystorePassword(Configuration conf, String authProvider)
-    {
-        conf.set(INPUT_NATIVE_SSL_KEY_STORE_PASSWARD, authProvider);
-    }
-
-    public static void setInputNativeSSLTruststorePassword(Configuration conf, String authProvider)
-    {
-        conf.set(INPUT_NATIVE_SSL_TRUST_STORE_PASSWARD, authProvider);
-    }
-
-    public static void setInputNativeSSLCipherSuites(Configuration conf, String authProvider)
-    {
-        conf.set(INPUT_NATIVE_SSL_CIPHER_SUITES, authProvider);
-    }
-
-    public static void setInputNativeReuseAddress(Configuration conf, String reuseAddress)
-    {
-        conf.set(INPUT_NATIVE_REUSE_ADDRESS, reuseAddress);
-    }
-
-    public static void setInputNativeKeepAlive(Configuration conf, String keepAlive)
-    {
-        conf.set(INPUT_NATIVE_KEEP_ALIVE, keepAlive);
-    }
-
-    public static void setInputNativePort(Configuration conf, String port)
-    {
-        conf.set(INPUT_NATIVE_PORT, port);
-    }
-
-    private static PoolingOptions getReadPoolingOptions(Configuration conf)
-    {
-        Optional<Integer> coreConnections = getInputCoreConnections(conf);
-        Optional<Integer> maxConnections = getInputMaxConnections(conf);
-        Optional<Integer> maxSimultaneousRequests = getInputMaxSimultReqPerConnections(conf);
-        Optional<Integer> minSimultaneousRequests = getInputMinSimultReqPerConnections(conf);
-        
-        PoolingOptions poolingOptions = new PoolingOptions();
-
-        if (coreConnections.isPresent())
-            poolingOptions.setCoreConnectionsPerHost(HostDistance.LOCAL, coreConnections.get());
-        if (maxConnections.isPresent())
-            poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, maxConnections.get());
-        if (maxSimultaneousRequests.isPresent())
-            poolingOptions.setMaxSimultaneousRequestsPerConnectionThreshold(HostDistance.LOCAL, maxSimultaneousRequests.get());
-        if (minSimultaneousRequests.isPresent())
-            poolingOptions.setMinSimultaneousRequestsPerConnectionThreshold(HostDistance.LOCAL, minSimultaneousRequests.get());
-
-        poolingOptions.setCoreConnectionsPerHost(HostDistance.REMOTE, 0)
-                      .setMaxConnectionsPerHost(HostDistance.REMOTE, 0)
-                      .setMaxSimultaneousRequestsPerConnectionThreshold(HostDistance.REMOTE, 0)
-                      .setMinSimultaneousRequestsPerConnectionThreshold(HostDistance.REMOTE, 0);
-
-        return poolingOptions;
-    }  
-
-    private static QueryOptions getReadQueryOptions(Configuration conf)
-    {
-        String CL = ConfigHelper.getReadConsistencyLevel(conf);
-        Optional<Integer> fetchSize = getInputPageRowSize(conf);
-        QueryOptions queryOptions = new QueryOptions();
-        if (CL != null && !CL.isEmpty())
-            queryOptions.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.valueOf(CL));
-
-        if (fetchSize.isPresent())
-            queryOptions.setFetchSize(fetchSize.get());
-        return queryOptions;
-    }
-
-    private static SocketOptions getReadSocketOptions(Configuration conf)
-    {
-        SocketOptions socketOptions = new SocketOptions();
-        Optional<Integer> connectTimeoutMillis = getInputNativeConnectionTimeout(conf);
-        Optional<Integer> readTimeoutMillis = getInputNativeReadConnectionTimeout(conf);
-        Optional<Integer> receiveBufferSize = getInputNativeReceiveBufferSize(conf);
-        Optional<Integer> sendBufferSize = getInputNativeSendBufferSize(conf);
-        Optional<Integer> soLinger = getInputNativeSolinger(conf);
-        Optional<Boolean> tcpNoDelay = getInputNativeTcpNodelay(conf);
-        Optional<Boolean> reuseAddress = getInputNativeReuseAddress(conf);       
-        Optional<Boolean> keepAlive = getInputNativeKeepAlive(conf);
-
-        if (connectTimeoutMillis.isPresent())
-            socketOptions.setConnectTimeoutMillis(connectTimeoutMillis.get());
-        if (readTimeoutMillis.isPresent())
-            socketOptions.setReadTimeoutMillis(readTimeoutMillis.get());
-        if (receiveBufferSize.isPresent())
-            socketOptions.setReceiveBufferSize(receiveBufferSize.get());
-        if (sendBufferSize.isPresent())
-            socketOptions.setSendBufferSize(sendBufferSize.get());
-        if (soLinger.isPresent())
-            socketOptions.setSoLinger(soLinger.get());
-        if (tcpNoDelay.isPresent())
-            socketOptions.setTcpNoDelay(tcpNoDelay.get());
-        if (reuseAddress.isPresent())
-            socketOptions.setReuseAddress(reuseAddress.get());
-        if (keepAlive.isPresent())
-            socketOptions.setKeepAlive(keepAlive.get());     
-
-        return socketOptions;
-    }
-
-    private static LoadBalancingPolicy getReadLoadBalancingPolicy(Configuration conf, final String stickHost)
-    {
-        return new LoadBalancingPolicy()
-        {
-            private Host origHost;
-            private Set<Host> liveRemoteHosts = Sets.newHashSet();
-
-            @Override
-            public void onAdd(Host host)
-            {
-                if (host.getAddress().getHostName().equals(stickHost))
-                    origHost = host;
-            }
-
-            @Override
-            public void onDown(Host host)
-            {
-                if (host.getAddress().getHostName().equals(stickHost))
-                    origHost = null;
-                liveRemoteHosts.remove(host);
-            }
-
-            @Override
-            public void onRemove(Host host)
-            {
-                if (host.getAddress().getHostName().equals(stickHost))
-                    origHost = null;
-                liveRemoteHosts.remove(host);
-            }
-
-            @Override
-            public void onUp(Host host)
-            {
-                if (host.getAddress().getHostName().equals(stickHost))
-                    origHost = host;
-                liveRemoteHosts.add(host);
-            }
-
-            @Override
-            public HostDistance distance(Host host)
-            {
-                if (host.getAddress().getHostName().equals(stickHost))
-                    return HostDistance.LOCAL;
-                else
-                    return HostDistance.REMOTE;
-            }
-
-            @Override
-            public void init(Cluster cluster, Collection<Host> hosts)
-            {
-                for (Host host : hosts)
-                {
-                    if (host.getAddress().getHostName().equals(stickHost))
-                    {
-                        origHost = host;
-                        break;
-                    }
-                }
-            }
-
-            @Override
-            public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement)
-            {
-                if (origHost != null)
-                {
-                    return Iterators.concat(Collections.singletonList(origHost).iterator(), liveRemoteHosts.iterator());
-                }
-                else
-                {
-                    return liveRemoteHosts.iterator();
-                }
-            }
-        };
-    }
-
-    private static Optional<AuthProvider> getAuthProvider(Configuration conf)
-    {
-        Optional<String> authProvider = getInputNativeAuthProvider(conf);
-        if (!authProvider.isPresent())
-            return Optional.absent();
-
-        return Optional.of(getClientAuthProvider(authProvider.get()));  
-    }
-
-    private static Optional<SSLOptions> getSSLOptions(Configuration conf)
-    {
-        Optional<String> truststorePath = getInputNativeSSLTruststorePath(conf);
-        Optional<String> keystorePath = getInputNativeSSLKeystorePath(conf);
-        Optional<String> truststorePassword = getInputNativeSSLTruststorePassword(conf);
-        Optional<String> keystorePassword = getInputNativeSSLKeystorePassword(conf);
-        Optional<String> cipherSuites = getInputNativeSSLCipherSuites(conf);
-        
-        if (truststorePath.isPresent() && keystorePath.isPresent() && truststorePassword.isPresent() && keystorePassword.isPresent())
-        {
-            SSLContext context;
-            try
-            {
-                context = getSSLContext(truststorePath.get(), truststorePassword.get(), keystorePath.get(), keystorePassword.get());
-            }
-            catch (UnrecoverableKeyException | KeyManagementException |
-                    NoSuchAlgorithmException | KeyStoreException | CertificateException | IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-            String[] css = SSLOptions.DEFAULT_SSL_CIPHER_SUITES;
-            if (cipherSuites.isPresent())
-                css = cipherSuites.get().split(",");
-            return Optional.of(new SSLOptions(context,css));
-        }
-        return Optional.absent();
-    }
-
-    private static Optional<Integer> getIntSetting(String parameter, Configuration conf)
-    {
-        String setting = conf.get(parameter);
-        if (setting == null)
-            return Optional.absent();
-        return Optional.of(Integer.parseInt(setting));  
-    }
-
-    private static Optional<Boolean> getBooleanSetting(String parameter, Configuration conf)
-    {
-        String setting = conf.get(parameter);
-        if (setting == null)
-            return Optional.absent();
-        return Optional.of(Boolean.parseBoolean(setting));  
-    }
-
-    private static Optional<String> getStringSetting(String parameter, Configuration conf)
-    {
-        String setting = conf.get(parameter);
-        if (setting == null)
-            return Optional.absent();
-        return Optional.of(setting);  
-    }
-
-    private static AuthProvider getClientAuthProvider(String factoryClassName)
-    {
-        try
-        {
-            return (AuthProvider) Class.forName(factoryClassName).newInstance();
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException("Failed to instantiate auth provider:" + factoryClassName, e);
-        }
-    }
-
-    private static SSLContext getSSLContext(String truststorePath, String truststorePassword, String keystorePath, String keystorePassword)
-            throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException, UnrecoverableKeyException, KeyManagementException
-    {
-        
-        FileInputStream tsf = new FileInputStream(truststorePath);
-        FileInputStream ksf = new FileInputStream(keystorePath);
-        SSLContext ctx = SSLContext.getInstance("SSL");
-
-        KeyStore ts = KeyStore.getInstance("JKS");
-        ts.load(tsf, truststorePassword.toCharArray());
-        TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
-        tmf.init(ts);
-
-        KeyStore ks = KeyStore.getInstance("JKS");
-        ks.load(ksf, keystorePassword.toCharArray());
-        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
-        kmf.init(ks, keystorePassword.toCharArray());
-
-        ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());
-        return ctx;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
deleted file mode 100644
index e1cdf32..0000000
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop.cql3;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-
-import com.datastax.driver.core.Row;
-
-/**
- * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
- *
- * At minimum, you need to set the KS and CF in your Hadoop job Configuration.  
- * The ConfigHelper class is provided to make this
- * simple:
- *   ConfigHelper.setInputColumnFamily
- *
- * You can also configure the number of rows per InputSplit with
- *   ConfigHelper.setInputSplitSize. The default split size is 64k rows.
- *
- *   the number of CQL rows per page
- *   CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You 
- *   should set it to "as big as possible, but no bigger." It set the LIMIT for the CQL 
- *   query, so you need set it big enough to minimize the network overhead, and also
- *   not too big to avoid out of memory issue.
- *   
- *   other native protocol connection parameters in CqlConfigHelper
- */
-public class CqlInputFormat extends AbstractColumnFamilyInputFormat<Long, Row>
-{
-    public RecordReader<Long, Row> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter)
-            throws IOException
-    {
-        TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)))
-        {
-            @Override
-            public void progress()
-            {
-                reporter.progress();
-            }
-        };
-
-        CqlRecordReader recordReader = new CqlRecordReader();
-        recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac);
-        return recordReader;
-    }
-
-    @Override
-    public org.apache.hadoop.mapreduce.RecordReader<Long, Row> createRecordReader(
-            org.apache.hadoop.mapreduce.InputSplit arg0, TaskAttemptContext arg1) throws IOException,
-            InterruptedException
-    {
-        return new CqlRecordReader();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
index b692280..cee4b4b 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
@@ -117,7 +117,7 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>,
 
         try
         {
-            pageRowSize = CqlConfigHelper.getInputPageRowSize(conf).get();
+            pageRowSize = Integer.parseInt(CqlConfigHelper.getInputPageRowSize(conf));
         }
         catch (NumberFormatException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
deleted file mode 100644
index a19cf70..0000000
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop.cql3;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Maps;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.hadoop.ColumnFamilySplit;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.utils.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ColumnMetadata;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-/**
- * CqlRecordReader reads the rows return from the CQL query
- * It uses CQL auto-paging.
- * <p/>
- * Return a Long as a local CQL row key starts from 0;
- * <p/>
- * Row as C* java driver CQL result set row
- * 1) select clause must include partition key columns (to calculate the progress based on the actual CF row processed)
- * 2) where clause must include token(partition_key1, ...  , partition_keyn) > ? and 
- *       token(partition_key1, ... , partition_keyn) <= ?  (in the right order) 
- */
-public class CqlRecordReader extends RecordReader<Long, Row>
-        implements org.apache.hadoop.mapred.RecordReader<Long, Row>
-{
-    private static final Logger logger = LoggerFactory.getLogger(CqlRecordReader.class);
-
-    private ColumnFamilySplit split;
-    private RowIterator rowIterator;
-
-    private Pair<Long, Row> currentRow;
-    private int totalRowCount; // total number of rows to fetch
-    private String keyspace;
-    private String cfName;
-    private String cqlQuery;
-    private Cluster cluster;
-    private Session session;
-    private IPartitioner partitioner;
-
-    // partition keys -- key aliases
-    private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap();
-
-    public CqlRecordReader()
-    {
-        super();
-    }
-
-    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
-    {
-        this.split = (ColumnFamilySplit) split;
-        Configuration conf = context.getConfiguration();
-        totalRowCount = (this.split.getLength() < Long.MAX_VALUE)
-                      ? (int) this.split.getLength()
-                      : ConfigHelper.getInputSplitSize(conf);
-        cfName = ConfigHelper.getInputColumnFamily(conf);
-        keyspace = ConfigHelper.getInputKeyspace(conf);              
-        cqlQuery = CqlConfigHelper.getInputCql(conf);
-        partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration());
-        try
-        {
-            if (cluster != null)
-                return;
-
-            // create connection using thrift
-            String[] locations = split.getLocations();
-            Exception lastException = null;
-            for (String location : locations)
-            {
-                try
-                {
-                    cluster = CqlConfigHelper.getInputCluster(location, conf);
-                    break;
-                }
-                catch (Exception e)
-                {
-                    lastException = e;
-                    logger.warn("Failed to create authenticated client to {}", location);
-                }
-            }
-            if (cluster == null && lastException != null)
-                throw lastException;
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-
-        session = cluster.connect(keyspace);
-        rowIterator = new RowIterator();
-        logger.debug("created {}", rowIterator);
-    }
-
-    public void close()
-    {
-        if (session != null)
-            session.close();
-    }
-
-    public Long getCurrentKey()
-    {
-        return currentRow.left;
-    }
-
-    public Row getCurrentValue()
-    {
-        return currentRow.right;
-    }
-
-    public float getProgress()
-    {
-        if (!rowIterator.hasNext())
-            return 1.0F;
-
-        // the progress is likely to be reported slightly off the actual but close enough
-        float progress = ((float) rowIterator.totalRead / totalRowCount);
-        return progress > 1.0F ? 1.0F : progress;
-    }
-
-    public boolean nextKeyValue() throws IOException
-    {
-        if (!rowIterator.hasNext())
-        {
-            logger.debug("Finished scanning {} rows (estimate was: {})", rowIterator.totalRead, totalRowCount);
-            return false;
-        }
-
-        try
-        {
-            currentRow = rowIterator.next();
-        }
-        catch (Exception e)
-        {
-            // throw it as IOException, so client can catch it and handle it at client side
-            IOException ioe = new IOException(e.getMessage());
-            ioe.initCause(ioe.getCause());
-            throw ioe;
-        }
-        return true;
-    }
-
-    // Because the old Hadoop API wants us to write to the key and value
-    // and the new asks for them, we need to copy the output of the new API
-    // to the old. Thus, expect a small performance hit.
-    // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat
-    // and ColumnFamilyRecordReader don't support them, it should be fine for now.
-    public boolean next(Long key, Row value) throws IOException
-    {
-        if (nextKeyValue())
-        {
-            key = getCurrentKey();
-            value = getCurrentValue();
-            return true;
-        }
-        return false;
-    }
-
-    public long getPos() throws IOException
-    {
-        return (long) rowIterator.totalRead;
-    }
-
-    public Long createKey()
-    {
-        return null;
-    }
-
-    public Row createValue()
-    {
-        return null;
-    }
-
-    /** CQL row iterator 
-     *  Input cql query  
-     *  1) select clause must include key columns (if we use partition key based row count)
-     *  2) where clause must include token(partition_key1 ... partition_keyn) > ? and 
-     *     token(partition_key1 ... partition_keyn) <= ? 
-     */
-    private class RowIterator extends AbstractIterator<Pair<Long, Row>>
-    {
-        private long keyId = 0L;
-        protected int totalRead = 0; // total number of cf rows read
-        protected Iterator<Row> rows;
-        private Map<String, ByteBuffer> previousRowKey = new HashMap<String, ByteBuffer>(); // previous CF row key
-
-        public RowIterator()
-        {
-            AbstractType type = partitioner.getTokenValidator();
-            ResultSet rs = session.execute(cqlQuery, type.compose(type.fromString(split.getStartToken())), type.compose(type.fromString(split.getEndToken())) );
-            for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(keyspace).getTable(cfName).getPartitionKey())
-                partitionBoundColumns.put(meta.getName(), Boolean.TRUE);
-            rows = rs.iterator();
-        }
-
-        protected Pair<Long, Row> computeNext()
-        {
-            if (rows == null || !rows.hasNext())
-                return endOfData();
-
-            Row row = rows.next();
-            Map<String, ByteBuffer> keyColumns = new HashMap<String, ByteBuffer>(); 
-            for (String column : partitionBoundColumns.keySet())
-                keyColumns.put(column, row.getBytesUnsafe(column));
-
-            // increase total CF row read
-            if (previousRowKey.isEmpty() && !keyColumns.isEmpty())
-            {
-                previousRowKey = keyColumns;
-                totalRead++;
-            }
-            else
-            {
-                for (String column : partitionBoundColumns.keySet())
-                {
-                    if (BytesType.bytesCompare(keyColumns.get(column), previousRowKey.get(column)) != 0)
-                    {
-                        previousRowKey = keyColumns;
-                        totalRead++;
-                        break;
-                    }
-                }
-            }
-            keyId ++;
-            return Pair.create(keyId, row);
-        }
-    }
-}


[2/2] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by sl...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	build.xml


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6df31637
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6df31637
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6df31637

Branch: refs/heads/cassandra-2.1
Commit: 6df316375209410fbfe1e40059d25c915fa5fbb5
Parents: 43092aa b0841d8
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Mar 25 16:52:21 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Mar 25 16:52:21 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 -
 build.xml                                       |   3 -
 examples/hadoop_cql3_word_count/bin/word_count  |   3 +-
 .../bin/word_count_counters                     |   4 +-
 .../hadoop_cql3_word_count/src/WordCount.java   |  77 +--
 .../src/WordCountCounters.java                  |  54 +-
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  | 541 +------------------
 .../cassandra/hadoop/cql3/CqlInputFormat.java   |  80 ---
 .../hadoop/cql3/CqlPagingRecordReader.java      |   2 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  | 260 ---------
 10 files changed, 29 insertions(+), 996 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6df31637/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6df31637/build.xml
----------------------------------------------------------------------
diff --cc build.xml
index 6656015,464dece..83ce003
--- a/build.xml
+++ b/build.xml
@@@ -383,12 -377,9 +383,11 @@@
            <dependency groupId="org.apache.cassandra" artifactId="cassandra-thrift" version="${version}" />
            <dependency groupId="com.yammer.metrics" artifactId="metrics-core" version="2.2.0" />
            <dependency groupId="com.addthis.metrics" artifactId="reporter-config" version="2.1.0" />
 -          <dependency groupId="edu.stanford.ppl" artifactId="snaptree" version="0.1" />
            <dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
 +          <dependency groupId="io.airlift" artifactId="airline" version="0.6" />
            <dependency groupId="io.netty" artifactId="netty" version="3.6.6.Final" />
 +          <dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
 +          <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
-           <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.0.1" />
            <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
          </dependencyManagement>
          <developer id="alakshman" name="Avinash Lakshman"/>
@@@ -419,8 -410,8 +418,7 @@@
          <dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/>
        	<dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/>
          <dependency groupId="org.apache.pig" artifactId="pig"/>
 -
 -        <dependency groupId="net.java.dev.jna" artifactId="jna"/>
 +      	<dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
-         <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core"/>
        </artifact:pom>
  
        <artifact:pom id="coverage-deps-pom"
@@@ -486,10 -473,9 +484,9 @@@
          <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" optional="true"/>
          <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" optional="true"/>
          <dependency groupId="org.apache.pig" artifactId="pig" optional="true"/>
-       	<dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" optional="true"/>
  
          <!-- don't need jna to run, but nice to have -->
 -        <dependency groupId="net.java.dev.jna" artifactId="jna" optional="true"/>
 +        <dependency groupId="net.java.dev.jna" artifactId="jna" version="4.0.0"/>
          
          <!-- don't need jamm unless running a server in which case it needs to be a -javagent to be used anyway -->
          <dependency groupId="com.github.stephenc" artifactId="jamm"/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6df31637/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
----------------------------------------------------------------------