You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/06 21:19:14 UTC

[02/10] Add CQL3 input/output formats patch by Alex Liu; reviewed by jbellis and Mike Schrag for CASSANDRA-4421

http://git-wip-us.apache.org/repos/asf/cassandra/blob/56e0ad1b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index e95e7ad..f77352a 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -18,35 +18,14 @@
 package org.apache.cassandra.hadoop;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.SortedMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.thrift.TApplicationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.*;
 
 import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.thrift.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.*;
-import org.apache.thrift.TException;
 
 /**
  * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
@@ -65,283 +44,14 @@ import org.apache.thrift.TException;
  *
  * The default split size is 64k rows.
  */
-public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
-    implements org.apache.hadoop.mapred.InputFormat<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
+public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
 {
-    private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
-
-    public static final String MAPRED_TASK_ID = "mapred.task.id";
-    // The simple fact that we need this is because the old Hadoop API wants us to "write"
-    // to the key and value whereas the new asks for it.
-    // I choose 8kb as the default max key size (instanciated only once), but you can
-    // override it in your jobConf with this setting.
-    public static final String CASSANDRA_HADOOP_MAX_KEY_SIZE = "cassandra.hadoop.max_key_size";
-    public static final int    CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192;
-
-    private String keyspace;
-    private String cfName;
-    private IPartitioner partitioner;
-
-    private static void validateConfiguration(Configuration conf)
-    {
-        if (ConfigHelper.getInputKeyspace(conf) == null || ConfigHelper.getInputColumnFamily(conf) == null)
-        {
-            throw new UnsupportedOperationException("you must set the keyspace and columnfamily with setInputColumnFamily()");
-        }
-        if (ConfigHelper.getInputSlicePredicate(conf) == null)
-        {
-            throw new UnsupportedOperationException("you must set the predicate with setInputSlicePredicate");
-        }
-        if (ConfigHelper.getInputInitialAddress(conf) == null)
-            throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node with setInputInitialAddress");
-        if (ConfigHelper.getInputPartitioner(conf) == null)
-            throw new UnsupportedOperationException("You must set the Cassandra partitioner class with setInputPartitioner");
-    }
-
-    public List<InputSplit> getSplits(JobContext context) throws IOException
-    {
-        Configuration conf = context.getConfiguration();
-
-        validateConfiguration(conf);
-
-        // cannonical ranges and nodes holding replicas
-        List<TokenRange> masterRangeNodes = getRangeMap(conf);
-
-        keyspace = ConfigHelper.getInputKeyspace(context.getConfiguration());
-        cfName = ConfigHelper.getInputColumnFamily(context.getConfiguration());
-        partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration());
-        logger.debug("partitioner is " + partitioner);
-
-        // cannonical ranges, split into pieces, fetching the splits in parallel
-        ExecutorService executor = Executors.newCachedThreadPool();
-        List<InputSplit> splits = new ArrayList<InputSplit>();
-
-        try
-        {
-            List<Future<List<InputSplit>>> splitfutures = new ArrayList<Future<List<InputSplit>>>();
-            KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
-            Range<Token> jobRange = null;
-            if (jobKeyRange != null)
-            {
-                if (jobKeyRange.start_key == null)
-                {
-                    logger.warn("ignoring jobKeyRange specified without start_key");
-                }
-                else
-                {
-                    if (!partitioner.preservesOrder())
-                        throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving paritioner");
-                    if (jobKeyRange.start_token != null)
-                        throw new IllegalArgumentException("only start_key supported");
-                    if (jobKeyRange.end_token != null)
-                        throw new IllegalArgumentException("only start_key supported");
-                    jobRange = new Range<Token>(partitioner.getToken(jobKeyRange.start_key),
-                                                partitioner.getToken(jobKeyRange.end_key),
-                                                partitioner);
-                }
-            }
-
-            for (TokenRange range : masterRangeNodes)
-            {
-                if (jobRange == null)
-                {
-                    // for each range, pick a live owner and ask it to compute bite-sized splits
-                    splitfutures.add(executor.submit(new SplitCallable(range, conf)));
-                }
-                else
-                {
-                    Range<Token> dhtRange = new Range<Token>(partitioner.getTokenFactory().fromString(range.start_token),
-                                                             partitioner.getTokenFactory().fromString(range.end_token),
-                                                             partitioner);
-
-                    if (dhtRange.intersects(jobRange))
-                    {
-                        for (Range<Token> intersection: dhtRange.intersectionWith(jobRange))
-                        {
-                            range.start_token = partitioner.getTokenFactory().toString(intersection.left);
-                            range.end_token = partitioner.getTokenFactory().toString(intersection.right);
-                            // for each range, pick a live owner and ask it to compute bite-sized splits
-                            splitfutures.add(executor.submit(new SplitCallable(range, conf)));
-                        }
-                    }
-                }
-            }
-
-            // wait until we have all the results back
-            for (Future<List<InputSplit>> futureInputSplits : splitfutures)
-            {
-                try
-                {
-                    splits.addAll(futureInputSplits.get());
-                }
-                catch (Exception e)
-                {
-                    throw new IOException("Could not get input splits", e);
-                }
-            }
-        }
-        finally
-        {
-            executor.shutdownNow();
-        }
-
-        assert splits.size() > 0;
-        Collections.shuffle(splits, new Random(System.nanoTime()));
-        return splits;
-    }
-
-    /**
-     * Gets a token range and splits it up according to the suggested
-     * size into input splits that Hadoop can use.
-     */
-    class SplitCallable implements Callable<List<InputSplit>>
-    {
-
-        private final TokenRange range;
-        private final Configuration conf;
-
-        public SplitCallable(TokenRange tr, Configuration conf)
-        {
-            this.range = tr;
-            this.conf = conf;
-        }
-
-        public List<InputSplit> call() throws Exception
-        {
-            ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
-            List<CfSplit> subSplits = getSubSplits(keyspace, cfName, range, conf);
-            assert range.rpc_endpoints.size() == range.endpoints.size() : "rpc_endpoints size must match endpoints size";
-            // turn the sub-ranges into InputSplits
-            String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
-            // hadoop needs hostname, not ip
-            int endpointIndex = 0;
-            for (String endpoint: range.rpc_endpoints)
-            {
-                String endpoint_address = endpoint;
-                if (endpoint_address == null || endpoint_address.equals("0.0.0.0"))
-                    endpoint_address = range.endpoints.get(endpointIndex);
-                endpoints[endpointIndex++] = InetAddress.getByName(endpoint_address).getHostName();
-            }
-
-            Token.TokenFactory factory = partitioner.getTokenFactory();
-            for (CfSplit subSplit : subSplits)
-            {
-                Token left = factory.fromString(subSplit.getStart_token());
-                Token right = factory.fromString(subSplit.getEnd_token());
-                Range<Token> range = new Range<Token>(left, right, partitioner);
-                List<Range<Token>> ranges = range.isWrapAround() ? range.unwrap() : ImmutableList.of(range);
-                for (Range<Token> subrange : ranges)
-                {
-                    ColumnFamilySplit split =
-                            new ColumnFamilySplit(
-                                    factory.toString(subrange.left),
-                                    factory.toString(subrange.right),
-                                    subSplit.getRow_count(),
-                                    endpoints);
-
-                    logger.debug("adding " + split);
-                    splits.add(split);
-                }
-            }
-            return splits;
-        }
-    }
-
-    private List<CfSplit> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
-    {
-        int splitsize = ConfigHelper.getInputSplitSize(conf);
-        for (int i = 0; i < range.rpc_endpoints.size(); i++)
-        {
-            String host = range.rpc_endpoints.get(i);
-
-            if (host == null || host.equals("0.0.0.0"))
-                host = range.endpoints.get(i);
-
-            try
-            {
-                Cassandra.Client client = ConfigHelper.createConnection(conf, host, ConfigHelper.getInputRpcPort(conf));
-                client.set_keyspace(keyspace);
-
-                try
-                {
-                    return client.describe_splits_ex(cfName, range.start_token, range.end_token, splitsize);
-                }
-                catch (TApplicationException e)
-                {
-                    // fallback to guessing split size if talking to a server without describe_splits_ex method
-                    if (e.getType() == TApplicationException.UNKNOWN_METHOD)
-                    {
-                        List<String> splitPoints = client.describe_splits(cfName, range.start_token, range.end_token, splitsize);
-                        return tokenListToSplits(splitPoints, splitsize);
-                    }
-                    throw e;
-                }
-            }
-            catch (IOException e)
-            {
-                logger.debug("failed connect to endpoint " + host, e);
-            }
-            catch (TException e)
-            {
-                throw new RuntimeException(e);
-            }
-            catch (InvalidRequestException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-        throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
-    }
-
-
-    private List<CfSplit> tokenListToSplits(List<String> splitTokens, int splitsize)
-    {
-        List<CfSplit> splits = Lists.newArrayListWithExpectedSize(splitTokens.size() - 1);
-        for (int j = 0; j < splitTokens.size() - 1; j++)
-            splits.add(new CfSplit(splitTokens.get(j), splitTokens.get(j + 1), splitsize));
-        return splits;
-    }
-
-
-    private List<TokenRange> getRangeMap(Configuration conf) throws IOException
-    {
-        Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
-
-        List<TokenRange> map;
-        try
-        {
-            map = client.describe_ring(ConfigHelper.getInputKeyspace(conf));
-        }
-        catch (TException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (InvalidRequestException e)
-        {
-            throw new RuntimeException(e);
-        }
-        return map;
-    }
-
+    
     public RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
     {
         return new ColumnFamilyRecordReader();
     }
 
-
-    //
-    // Old Hadoop API
-    //
-    public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
-    {
-        TaskAttemptContext tac = new TaskAttemptContext(jobConf, new TaskAttemptID());
-        List<org.apache.hadoop.mapreduce.InputSplit> newInputSplits = this.getSplits(tac);
-        org.apache.hadoop.mapred.InputSplit[] oldInputSplits = new org.apache.hadoop.mapred.InputSplit[newInputSplits.size()];
-        for (int i = 0; i < newInputSplits.size(); i++)
-            oldInputSplits[i] = (ColumnFamilySplit)newInputSplits.get(i);
-        return oldInputSplits;
-    }
-
     public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException
     {
         TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)))
@@ -357,5 +67,16 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
         recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac);
         return recordReader;
     }
+    
+    @Override
+    protected void validateConfiguration(Configuration conf)
+    {
+        super.validateConfiguration(conf);
+        
+        if (ConfigHelper.getInputSlicePredicate(conf) == null)
+        {
+            throw new UnsupportedOperationException("you must set the predicate with setInputSlicePredicate");
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/56e0ad1b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
index b3cd516..724ba7d 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
@@ -20,22 +20,9 @@ package org.apache.cassandra.hadoop;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.thrift.*;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.*;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-
-import javax.security.auth.login.LoginException;
 
 /**
  * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
@@ -58,63 +45,11 @@ import javax.security.auth.login.LoginException;
  * official by sending a batch mutate request to Cassandra.
  * </p>
  */
-public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
-    implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>>
+public class ColumnFamilyOutputFormat extends AbstractColumnFamilyOutputFormat<ByteBuffer,List<Mutation>>
 {
-    public static final String BATCH_THRESHOLD = "mapreduce.output.columnfamilyoutputformat.batch.threshold";
-    public static final String QUEUE_SIZE = "mapreduce.output.columnfamilyoutputformat.queue.size";
-    private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyOutputFormat.class);
-
-
-    /**
-     * Check for validity of the output-specification for the job.
-     *
-     * @param context
-     *            information about the job
-     * @throws IOException
-     *             when output should not be attempted
-     */
-    @Override
-    public void checkOutputSpecs(JobContext context)
-    {
-        checkOutputSpecs(context.getConfiguration());
-    }
-
-    private void checkOutputSpecs(Configuration conf)
-    {
-        if (ConfigHelper.getOutputKeyspace(conf) == null)
-            throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()");
-        if (ConfigHelper.getOutputPartitioner(conf) == null)
-            throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster");
-        if (ConfigHelper.getOutputInitialAddress(conf) == null)
-            throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node");
-    }
-
-    /**
-     * The OutputCommitter for this format does not write any data to the DFS.
-     *
-     * @param context
-     *            the task context
-     * @return an output committer
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    @Override
-    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
-    {
-        return new NullOutputCommitter();
-    }
-
-    /** Fills the deprecated OutputFormat interface for streaming. */
-    @Deprecated
-    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
-    {
-        checkOutputSpecs(job);
-    }
-
     /** Fills the deprecated OutputFormat interface for streaming. */
     @Deprecated
-    public ColumnFamilyRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException
+    public ColumnFamilyRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress)
     {
         return new ColumnFamilyRecordWriter(job, new Progressable(progress));
     }
@@ -127,62 +62,8 @@ public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutat
      * @return a {@link RecordWriter} to write the output for the job.
      * @throws IOException
      */
-    @Override
-    public ColumnFamilyRecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException
+    public ColumnFamilyRecordWriter getRecordWriter(final TaskAttemptContext context) throws InterruptedException
     {
         return new ColumnFamilyRecordWriter(context);
     }
-
-    /**
-     * Return a client based on the given socket that points to the configured
-     * keyspace, and is logged in with the configured credentials.
-     *
-     * @param socket  a socket pointing to a particular node, seed or otherwise
-     * @param conf a job configuration
-     * @return a cassandra client
-     * @throws InvalidRequestException
-     * @throws TException
-     * @throws AuthenticationException
-     * @throws AuthorizationException
-     */
-    public static Cassandra.Client createAuthenticatedClient(TSocket socket, Configuration conf)
-            throws InvalidRequestException, TException, AuthenticationException, AuthorizationException, LoginException
-    {
-        logger.debug("Creating authenticated client for CF output format");
-        TTransport transport = ConfigHelper.getOutputTransportFactory(conf).openTransport(socket, conf);
-        TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
-        Cassandra.Client client = new Cassandra.Client(binaryProtocol);
-        client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
-        if (ConfigHelper.getOutputKeyspaceUserName(conf) != null)
-        {
-            Map<String, String> creds = new HashMap<String, String>();
-            creds.put(IAuthenticator.USERNAME_KEY, ConfigHelper.getOutputKeyspaceUserName(conf));
-            creds.put(IAuthenticator.PASSWORD_KEY, ConfigHelper.getOutputKeyspacePassword(conf));
-            AuthenticationRequest authRequest = new AuthenticationRequest(creds);
-            client.login(authRequest);
-        }
-        logger.debug("Authenticated client for CF output format created successfully");
-        return client;
-    }
-
-    /**
-     * An {@link OutputCommitter} that does nothing.
-     */
-    public static class NullOutputCommitter extends OutputCommitter
-    {
-        public void abortTask(TaskAttemptContext taskContext) { }
-
-        public void cleanupJob(JobContext jobContext) { }
-
-        public void commitTask(TaskAttemptContext taskContext) { }
-
-        public boolean needsTaskCommit(TaskAttemptContext taskContext)
-        {
-            return false;
-        }
-
-        public void setupJob(JobContext jobContext) { }
-
-        public void setupTask(TaskAttemptContext taskContext) { }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/56e0ad1b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index 30abdd5..daef8ec 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -24,11 +24,9 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.*;
-import org.apache.thrift.transport.TTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -43,7 +41,7 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
-import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
 
 public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
     implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
@@ -59,14 +57,14 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
     private boolean isEmptyPredicate;
     private int totalRowCount; // total number of rows to fetch
     private int batchSize; // fetch this many per batch
-    private String cfName;
     private String keyspace;
-    private TSocket socket;
+    private String cfName;
     private Cassandra.Client client;
     private ConsistencyLevel consistencyLevel;
     private int keyBufferSize = 8192;
     private List<IndexExpression> filter;
 
+
     public ColumnFamilyRecordReader()
     {
         this(ColumnFamilyRecordReader.CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT);
@@ -80,11 +78,11 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
 
     public void close()
     {
-        if (socket != null && socket.isOpen())
+        if (client != null)
         {
-            socket.close();
-            socket = null;
-            client = null;
+            TTransport transport = client.getOutputProtocol().getTransport();
+            if (transport.isOpen())
+                transport.close();
         }
     }
 
@@ -139,36 +137,25 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
         predicate = ConfigHelper.getInputSlicePredicate(conf);
         boolean widerows = ConfigHelper.getInputIsWide(conf);
         isEmptyPredicate = isEmptyPredicate(predicate);
-        totalRowCount = (int) this.split.getLength();
+        totalRowCount = (this.split.getLength() < Long.MAX_VALUE)
+                ? (int) this.split.getLength()
+                : ConfigHelper.getInputSplitSize(conf);
         batchSize = ConfigHelper.getRangeBatchSize(conf);
         cfName = ConfigHelper.getInputColumnFamily(conf);
         consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getReadConsistencyLevel(conf));
-
         keyspace = ConfigHelper.getInputKeyspace(conf);
 
         try
         {
-            // only need to connect once
-            if (socket != null && socket.isOpen())
+            if (client != null)
                 return;
 
             // create connection using thrift
             String location = getLocation();
-            socket = new TSocket(location, ConfigHelper.getInputRpcPort(conf));
-            TTransport transport = ConfigHelper.getInputTransportFactory(conf).openTransport(socket, conf);
-            TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
-            client = new Cassandra.Client(binaryProtocol);
-
-            // log in
-            client.set_keyspace(keyspace);
-            if (ConfigHelper.getInputKeyspaceUserName(conf) != null)
-            {
-                Map<String, String> creds = new HashMap<String, String>();
-                creds.put(IAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf));
-                creds.put(IAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf));
-                AuthenticationRequest authRequest = new AuthenticationRequest(creds);
-                client.login(authRequest);
-            }
+
+            int port = ConfigHelper.getInputRpcPort(conf);
+            client = ColumnFamilyInputFormat.createAuthenticatedClient(location, port, conf);
+
         }
         catch (Exception e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/56e0ad1b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
index 909c291..50ec059 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
@@ -22,21 +22,14 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.cassandra.client.RingCache;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
-import org.apache.thrift.transport.TSocket;
 
 
 /**
@@ -52,33 +45,12 @@ import org.apache.thrift.transport.TSocket;
  * </p>
  *
  * @see ColumnFamilyOutputFormat
- * @see OutputFormat
- *
  */
-final class ColumnFamilyRecordWriter extends RecordWriter<ByteBuffer,List<Mutation>>
-implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
+final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<ByteBuffer, List<Mutation>>
 {
-    // The configuration this writer is associated with.
-    private final Configuration conf;
-
-    // The ring cache that describes the token ranges each node in the ring is
-    // responsible for. This is what allows us to group the mutations by
-    // the endpoints they should be targeted at. The targeted endpoint
-    // essentially
-    // acts as the primary replica for the rows being affected by the mutations.
-    private final RingCache ringCache;
-
-    // The number of mutations to buffer per endpoint
-    private final int queueSize;
-
     // handles for clients for each range running in the threadpool
-    private final Map<Range,RangeClient> clients;
-    private final long batchThreshold;
-
-    private final ConsistencyLevel consistencyLevel;
-    private Progressable progressable;
-
-
+    private final Map<Range, RangeClient> clients;
+    
     /**
      * Upon construction, obtain the map that this writer will use to collect
      * mutations, and the ring cache for the given keyspace.
@@ -86,28 +58,44 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
      * @param context the task attempt context
      * @throws IOException
      */
-    ColumnFamilyRecordWriter(TaskAttemptContext context) throws IOException
+    ColumnFamilyRecordWriter(TaskAttemptContext context)
     {
         this(context.getConfiguration());
         this.progressable = new Progressable(context);
     }
 
-    ColumnFamilyRecordWriter(Configuration conf, Progressable progressable) throws IOException
+    ColumnFamilyRecordWriter(Configuration conf, Progressable progressable)
     {
         this(conf);
         this.progressable = progressable;
     }
 
-    ColumnFamilyRecordWriter(Configuration conf) throws IOException
+    ColumnFamilyRecordWriter(Configuration conf)
     {
-        this.conf = conf;
-        this.ringCache = new RingCache(conf);
-        this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
-        this.clients = new HashMap<Range,RangeClient>();
-        batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
-        consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf));
+        super(conf);
+        this.clients = new HashMap<Range, RangeClient>();
     }
-
+    
+    @Override
+    public void close() throws IOException
+    {
+        // close all the clients before throwing anything
+        IOException clientException = null;
+        for (RangeClient client : clients.values())
+        {
+            try
+            {
+                client.close();
+            }
+            catch (IOException e)
+            {
+                clientException = e;
+            }
+        }
+        if (clientException != null)
+            throw clientException;
+    }
+    
     /**
      * If the key is to be associated with a valid value, a mutation is created
      * for it with the given column family and columns. In the event the value
@@ -143,124 +131,22 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
     }
 
     /**
-     * Close this <code>RecordWriter</code> to future operations, but not before
-     * flushing out the batched mutations.
-     *
-     * @param context the context of the task
-     * @throws IOException
-     */
-    @Override
-    public void close(TaskAttemptContext context) throws IOException, InterruptedException
-    {
-        close();
-    }
-
-    /** Fills the deprecated RecordWriter interface for streaming. */
-    @Deprecated
-    public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
-    {
-        close();
-    }
-
-    private void close() throws IOException
-    {
-        // close all the clients before throwing anything
-        IOException clientException = null;
-        for (RangeClient client : clients.values())
-        {
-            try
-            {
-                client.close();
-            }
-            catch (IOException e)
-            {
-                clientException = e;
-            }
-        }
-        if (clientException != null)
-            throw clientException;
-    }
-
-    /**
      * A client that runs in a threadpool and connects to the list of endpoints for a particular
      * range. Mutations for keys in that range are sent to this client via a queue.
      */
-    public class RangeClient extends Thread
+    public class RangeClient extends AbstractRangeClient<Mutation>
     {
-        // The list of endpoints for this range
-        private final List<InetAddress> endpoints;
-        private final String columnFamily = ConfigHelper.getOutputColumnFamily(conf);
-        // A bounded queue of incoming mutations for this range
-        private final BlockingQueue<Pair<ByteBuffer, Mutation>> queue = new ArrayBlockingQueue<Pair<ByteBuffer,Mutation>>(queueSize);
-
-        private volatile boolean run = true;
-        // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
-        // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls,
-        // when the client is closed.
-        private volatile IOException lastException;
-
-        private Cassandra.Client thriftClient;
-        private TSocket thriftSocket;
-
+        public final String columnFamily = ConfigHelper.getOutputColumnFamily(conf);
+        
         /**
          * Constructs an {@link RangeClient} for the given endpoints.
          * @param endpoints the possible endpoints to execute the mutations on
          */
         public RangeClient(List<InetAddress> endpoints)
         {
-            super("client-" + endpoints);
-            this.endpoints = endpoints;
+            super(endpoints);
          }
-
-        /**
-         * enqueues the given value to Cassandra
-         */
-        public void put(Pair<ByteBuffer,Mutation> value) throws IOException
-        {
-            while (true)
-            {
-                if (lastException != null)
-                    throw lastException;
-                try
-                {
-                    if (queue.offer(value, 100, TimeUnit.MILLISECONDS))
-                        break;
-                }
-                catch (InterruptedException e)
-                {
-                    throw new AssertionError(e);
-                }
-            }
-        }
-
-        public void close() throws IOException
-        {
-            // stop the run loop.  this will result in closeInternal being called by the time join() finishes.
-            run = false;
-            interrupt();
-            try
-            {
-                this.join();
-            }
-            catch (InterruptedException e)
-            {
-                throw new AssertionError(e);
-            }
-
-            if (lastException != null)
-                throw lastException;
-        }
-
-        private void closeInternal()
-        {
-            if (thriftSocket != null)
-            {
-                thriftSocket.close();
-                thriftSocket = null;
-                thriftClient = null;
-            }
-        }
-
+        
         /**
          * Loops collecting mutations from the queue and sending to Cassandra
          */
@@ -303,7 +189,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
                     // send the mutation to the last-used endpoint.  first time through, this will NPE harmlessly.
                     try
                     {
-                        thriftClient.batch_mutate(batch, consistencyLevel);
+                        client.batch_mutate(batch, consistencyLevel);
                         break;
                     }
                     catch (Exception e)
@@ -320,8 +206,9 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
                     try
                     {
                         InetAddress address = iter.next();
-                        thriftSocket = new TSocket(address.getHostName(), ConfigHelper.getOutputRpcPort(conf));
-                        thriftClient = ColumnFamilyOutputFormat.createAuthenticatedClient(thriftSocket, conf);
+                        String host = address.getHostName();
+                        int port = ConfigHelper.getOutputRpcPort(conf);
+                        client = ColumnFamilyOutputFormat.createAuthenticatedClient(host, port, conf);
                     }
                     catch (Exception e)
                     {
@@ -337,11 +224,5 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
                 }
             }
         }
-
-        @Override
-        public String toString()
-        {
-            return "#<Client for " + endpoints.toString() + ">";
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/56e0ad1b/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
index ed1f160..69c7ddb 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.mapreduce.InputSplit;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.Arrays;
 
@@ -79,7 +80,6 @@ public class ColumnFamilySplit extends InputSplit implements Writable, org.apach
     {
         out.writeUTF(startToken);
         out.writeUTF(endToken);
-        out.writeLong(length);
         out.writeInt(dataNodes.length);
         for (String endpoint : dataNodes)
         {
@@ -91,8 +91,6 @@ public class ColumnFamilySplit extends InputSplit implements Writable, org.apach
     {
         startToken = in.readUTF();
         endToken = in.readUTF();
-        length = in.readLong();
-
         int numOfEndpoints = in.readInt();
         dataNodes = new String[numOfEndpoints];
         for(int i = 0; i < numOfEndpoints; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/56e0ad1b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index 0d12812..3dcfdd7 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -21,11 +21,9 @@ package org.apache.cassandra.hadoop;
  */
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
+import com.google.common.collect.Maps;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,11 +38,8 @@ import org.apache.thrift.TBase;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
-import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-
-import javax.security.auth.login.LoginException;
 
 
 public class ConfigHelper
@@ -74,6 +69,7 @@ public class ConfigHelper
     private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write";
     private static final String OUTPUT_COMPRESSION_CLASS = "cassandra.output.compression.class";
     private static final String OUTPUT_COMPRESSION_CHUNK_LENGTH = "cassandra.output.compression.length";
+
     private static final String INPUT_TRANSPORT_FACTORY_CLASS = "cassandra.input.transport.factory.class";
     private static final String OUTPUT_TRANSPORT_FACTORY_CLASS = "cassandra.output.transport.factory.class";
     private static final String THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB = "cassandra.thrift.framed.size_mb";
@@ -495,19 +491,13 @@ public class ConfigHelper
 
     /**
      * @param conf The configuration to use.
-     * @return Value (converts MBs to Bytes) set by {@link setThriftFramedTransportSizeInMb(Configuration, int)} or default of 15MB
+     * @return Value (converts MBs to Bytes) set by {@link #setThriftFramedTransportSizeInMb(Configuration, int)} or default of 15MB
      */
     public static int getThriftFramedTransportSize(Configuration conf)
     {
         return conf.getInt(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB, 15) * 1024 * 1024; // 15MB is default in Cassandra
     }
 
-    @Deprecated
-    public static void setThriftMaxMessageLengthInMb(Configuration conf, int maxMessageSizeInMB)
-    {
-        // SEE CASSANDRA-5529
-    }
-
     public static CompressionParameters getOutputCompressionParamaters(Configuration conf)
     {
         if (getOutputCompressionClass(conf) == null)
@@ -567,48 +557,44 @@ public class ConfigHelper
     {
         try
         {
-            TSocket socket = new TSocket(host, port);
-            TTransport transport = getInputTransportFactory(conf).openTransport(socket, conf);
+            TTransport transport = getClientTransportFactory(conf).openTransport(host, port, conf);
             return new Cassandra.Client(new TBinaryProtocol(transport, true, true));
         }
-        catch (LoginException e)
-        {
-            throw new IOException("Unable to login to server " + host + ":" + port, e);
-        }
-        catch (TTransportException e)
+        catch (Exception e)
         {
             throw new IOException("Unable to connect to server " + host + ":" + port, e);
         }
     }
 
-    public static ITransportFactory getInputTransportFactory(Configuration conf)
-    {
-        return getTransportFactory(conf.get(INPUT_TRANSPORT_FACTORY_CLASS, TFramedTransportFactory.class.getName()));
-    }
-
-    public static void setInputTransportFactoryClass(Configuration conf, String classname)
-    {
-        conf.set(INPUT_TRANSPORT_FACTORY_CLASS, classname);
-    }
-
-    public static ITransportFactory getOutputTransportFactory(Configuration conf)
-    {
-        return getTransportFactory(conf.get(OUTPUT_TRANSPORT_FACTORY_CLASS, TFramedTransportFactory.class.getName()));
-    }
-
-    public static void setOutputTransportFactoryClass(Configuration conf, String classname)
+    public static TClientTransportFactory getClientTransportFactory(Configuration conf)
     {
-        conf.set(OUTPUT_TRANSPORT_FACTORY_CLASS, classname);
+        String factoryClassName = conf.get(
+                TClientTransportFactory.PROPERTY_KEY,
+                TFramedTransportFactory.class.getName());
+        TClientTransportFactory factory = getClientTransportFactory(factoryClassName);
+        Map<String, String> options = getOptions(conf, factory.supportedOptions());
+        factory.setOptions(options);
+        return factory;
     }
 
-    private static ITransportFactory getTransportFactory(String factoryClassName) {
+    private static TClientTransportFactory getClientTransportFactory(String factoryClassName) {
         try
         {
-            return (ITransportFactory) Class.forName(factoryClassName).newInstance();
+            return (TClientTransportFactory) Class.forName(factoryClassName).newInstance();
         }
         catch (Exception e)
         {
             throw new RuntimeException("Failed to instantiate transport factory:" + factoryClassName, e);
         }
     }
+    private static Map<String, String> getOptions(Configuration conf, Set<String> supportedOptions) {
+        Map<String, String> options = Maps.newHashMap();
+        for (String optionKey : supportedOptions)
+        {
+            String optionValue = conf.get(optionKey);
+            if (optionValue != null)
+                options.put(optionKey, optionValue);
+        }
+        return options;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/56e0ad1b/src/java/org/apache/cassandra/hadoop/Progressable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/Progressable.java b/src/java/org/apache/cassandra/hadoop/Progressable.java
index 091a828..ac253ef 100644
--- a/src/java/org/apache/cassandra/hadoop/Progressable.java
+++ b/src/java/org/apache/cassandra/hadoop/Progressable.java
@@ -29,12 +29,12 @@ public class Progressable
     private TaskAttemptContext context;
     private org.apache.hadoop.util.Progressable progressable;
 
-    Progressable(TaskAttemptContext context)
+    public Progressable(TaskAttemptContext context)
     {
         this.context = context;
     }
 
-    Progressable(org.apache.hadoop.util.Progressable progressable)
+    public Progressable(org.apache.hadoop.util.Progressable progressable)
     {
         this.progressable = progressable;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/56e0ad1b/src/java/org/apache/cassandra/hadoop/cql3/CQLConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CQLConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CQLConfigHelper.java
new file mode 100644
index 0000000..66bcfdb
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CQLConfigHelper.java
@@ -0,0 +1,109 @@
+package org.apache.cassandra.hadoop.cql3;
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+import org.apache.hadoop.conf.Configuration;
+
+public class CQLConfigHelper
+{
+    private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon ,
+    private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size";
+    private static final String INPUT_CQL_WHERE_CLAUSE_CONFIG = "cassandra.input.where.clause";
+    private static final String OUTPUT_CQL = "cassandra.output.cql";
+
+    /**
+     * Set the CQL columns for the input of this job.
+     *
+     * @param conf Job configuration you are about to run
+     * @param columns
+     */
+    public static void setInputColumns(Configuration conf, String columns)
+    {
+        if (columns == null || columns.isEmpty())
+            return;
+        
+        conf.set(INPUT_CQL_COLUMNS_CONFIG, columns);
+    }
+    
+    /**
+     * Set the CQL query Limit for the input of this job.
+     *
+     * @param conf Job configuration you are about to run
+     * @param cqlPageRowSize
+     */
+    public static void setInputCQLPageRowSize(Configuration conf, String cqlPageRowSize)
+    {
+        if (cqlPageRowSize == null)
+        {
+            throw new UnsupportedOperationException("cql page row size may not be null");
+        }
+
+        conf.set(INPUT_CQL_PAGE_ROW_SIZE_CONFIG, cqlPageRowSize);
+    }
+
+    /**
+     * Set the CQL user defined where clauses for the input of this job.
+     *
+     * @param conf Job configuration you are about to run
+     * @param clauses
+     */
+    public static void setInputWhereClauses(Configuration conf, String clauses)
+    {
+        if (clauses == null || clauses.isEmpty())
+            return;
+        
+        conf.set(INPUT_CQL_WHERE_CLAUSE_CONFIG, clauses);
+    }
+  
+    /**
+     * Set the CQL prepared statement for the output of this job.
+     *
+     * @param conf Job configuration you are about to run
+     * @param cql
+     */
+    public static void setOutputCql(Configuration conf, String cql)
+    {
+        if (cql == null || cql.isEmpty())
+            return;
+        
+        conf.set(OUTPUT_CQL, cql);
+    }
+    
+    
+    public static String getInputcolumns(Configuration conf)
+    {
+        return conf.get(INPUT_CQL_COLUMNS_CONFIG);
+    }
+    
+    public static String getInputPageRowSize(Configuration conf)
+    {
+        return conf.get(INPUT_CQL_PAGE_ROW_SIZE_CONFIG);
+    }
+    
+    public static String getInputWhereClauses(Configuration conf)
+    {
+        return conf.get(INPUT_CQL_WHERE_CLAUSE_CONFIG);
+    }
+    
+    public static String getOutputCql(Configuration conf)
+    {
+        return conf.get(OUTPUT_CQL);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/56e0ad1b/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyInputFormat.java
new file mode 100644
index 0000000..525ed89
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyInputFormat.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop.cql3;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
+ *
+ * At minimum, you need to set the KS and CF in your Hadoop job Configuration.  
+ * The ConfigHelper class is provided to make this
+ * simple:
+ *   ConfigHelper.setInputColumnFamily
+ *
+ * You can also configure the number of rows per InputSplit with
+ *   ConfigHelper.setInputSplitSize. The default split size is 64k rows.
+ *   the number of CQL rows per page
+ *   
+ *   the number of CQL rows per page
+ *   CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You 
+ *   should set it to "as big as possible, but no bigger." It set the LIMIT for the CQL 
+ *   query, so you need set it big enough to minimize the network overhead, and also
+ *   not too big to avoid out of memory issue.
+ *   
+ *   the column names of the select CQL query. The default is all columns
+ *   CQLConfigHelper.setInputColumns
+ *   
+ *   the user defined the where clause
+ *   CQLConfigHelper.setInputWhereClauses. The default is no user defined where clause
+ */
+public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<Map<String, ByteBuffer>, Map<String, ByteBuffer>>
+{
+    public RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter)
+            throws IOException
+    {
+        TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)))
+        {
+            @Override
+            public void progress()
+            {
+                reporter.progress();
+            }
+        };
+
+        ColumnFamilyRecordReader recordReader = new ColumnFamilyRecordReader();
+        recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac);
+        return recordReader;
+    }
+
+    @Override
+    public org.apache.hadoop.mapreduce.RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> createRecordReader(
+            org.apache.hadoop.mapreduce.InputSplit arg0, TaskAttemptContext arg1) throws IOException,
+            InterruptedException
+    {
+        return new ColumnFamilyRecordReader();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/56e0ad1b/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyOutputFormat.java
new file mode 100644
index 0000000..3f6e2af
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyOutputFormat.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop.cql3;
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.hadoop.AbstractColumnFamilyOutputFormat;
+import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.Progressable;
+import org.apache.hadoop.mapreduce.*;
+
+/**
+ * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
+ * OutputFormat that allows reduce tasks to store keys (and corresponding
+ *  binded variable values) as CQL rows (and respective columns) in a given
+ * ColumnFamily.
+ *
+ * <p>
+ * As is the case with the {@link ColumnFamilyInputFormat}, you need to set the
+ * prepared statement in your
+ * Hadoop job Configuration. The {@link CQLConfigHelper} class, through its
+ * {@link ConfigHelper#setOutputPreparedStatement} method, is provided to make this
+ * simple.
+ * you need to set the Keyspace. The {@link ConfigHelper} class, through its
+ * {@link ConfigHelper#setOutputColumnFamily} method, is provided to make this
+ * simple.
+ * </p>
+ * 
+ * <p>
+ * For the sake of performance, this class employs a lazy write-back caching
+ * mechanism, where its record writer prepared statement binded variable values
+ * created based on the reduce's inputs (in a task-specific map), and periodically 
+ * makes the changes official by sending a execution of prepared statement request 
+ * to Cassandra.
+ * </p>
+ */
+public class ColumnFamilyOutputFormat extends AbstractColumnFamilyOutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>>
+{   
+    /** Fills the deprecated OutputFormat interface for streaming. */
+    @Deprecated
+    public ColumnFamilyRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException
+    {
+        return new ColumnFamilyRecordWriter(job, new Progressable(progress));
+    }
+
+    /**
+     * Get the {@link RecordWriter} for the given task.
+     *
+     * @param context
+     *            the information about the current task.
+     * @return a {@link RecordWriter} to write the output for the job.
+     * @throws IOException
+     */
+    public ColumnFamilyRecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        return new ColumnFamilyRecordWriter(context);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/56e0ad1b/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordReader.java
new file mode 100644
index 0000000..03d7af5
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordReader.java
@@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop.cql3;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.util.*;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.hadoop.ColumnFamilySplit;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransport;
+
+/**
+ * Hadoop RecordReader read the values return from the CQL query
+ * It use CQL key range query to page through the wide rows.
+ * <p/>
+ * Return List<IColumn> as keys columns
+ * <p/>
+ * Map<ByteBuffer, IColumn> as column name to columns mappings
+ */
+public class ColumnFamilyRecordReader extends RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>>
+        implements org.apache.hadoop.mapred.RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>>
+{
+    private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class);
+
+    public static final int DEFAULT_CQL_PAGE_LIMIT = 1000; // TODO: find the number large enough but not OOM
+
+    private ColumnFamilySplit split;
+    private RowIterator rowIterator;
+
+    private Pair<Map<String, ByteBuffer>, Map<String, ByteBuffer>> currentRow;
+    private int totalRowCount; // total number of rows to fetch
+    private String keyspace;
+    private String cfName;
+    private Cassandra.Client client;
+    private ConsistencyLevel consistencyLevel;
+
+    // partition keys -- key aliases
+    private List<BoundColumn> partitionBoundColumns = new ArrayList<BoundColumn>();
+
+    // cluster keys -- column aliases
+    private List<BoundColumn> clusterColumns = new ArrayList<BoundColumn>();
+
+    // map prepared query type to item id
+    private Map<Integer, Integer> preparedQueryIds = new HashMap<Integer, Integer>();
+
+    // cql query select columns
+    private String columns;
+
+    // the number of cql rows per page
+    private int pageRowSize;
+
+    // user defined where clauses
+    private String userDefinedWhereClauses;
+
+    private IPartitioner partitioner;
+
+    private AbstractType<?> keyValidator;
+
+    public ColumnFamilyRecordReader()
+    {
+        super();
+    }
+
+    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
+    {
+        this.split = (ColumnFamilySplit) split;
+        Configuration conf = context.getConfiguration();
+        totalRowCount = (this.split.getLength() < Long.MAX_VALUE)
+                      ? (int) this.split.getLength()
+                      : ConfigHelper.getInputSplitSize(conf);
+        cfName = ConfigHelper.getInputColumnFamily(conf);
+        consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getReadConsistencyLevel(conf));
+        keyspace = ConfigHelper.getInputKeyspace(conf);
+        columns = CQLConfigHelper.getInputcolumns(conf);
+        userDefinedWhereClauses = CQLConfigHelper.getInputWhereClauses(conf);
+
+        try
+        {
+            pageRowSize = Integer.parseInt(CQLConfigHelper.getInputPageRowSize(conf));
+        }
+        catch (NumberFormatException e)
+        {
+            pageRowSize = DEFAULT_CQL_PAGE_LIMIT;
+        }
+
+        partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration());
+
+        try
+        {
+            if (client != null)
+                return;
+
+            // create connection using thrift
+            String location = getLocation();
+
+            int port = ConfigHelper.getInputRpcPort(conf);
+            client = ColumnFamilyInputFormat.createAuthenticatedClient(location, port, conf);
+
+            // retrieve partition keys and cluster keys from system.schema_columnfamilies table
+            retrieveKeys();
+
+            client.set_keyspace(keyspace);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        rowIterator = new RowIterator();
+
+        logger.debug("created {}", rowIterator);
+    }
+
+    public void close()
+    {
+        if (client != null)
+        {
+            TTransport transport = client.getOutputProtocol().getTransport();
+            if (transport.isOpen())
+                transport.close();
+            client = null;
+        }
+    }
+
+    public Map<String, ByteBuffer> getCurrentKey()
+    {
+        return currentRow.left;
+    }
+
+    public Map<String, ByteBuffer> getCurrentValue()
+    {
+        return currentRow.right;
+    }
+
+    public float getProgress()
+    {
+        if (!rowIterator.hasNext())
+            return 1.0F;
+
+        // the progress is likely to be reported slightly off the actual but close enough
+        float progress = ((float) rowIterator.totalRead / totalRowCount);
+        return progress > 1.0F ? 1.0F : progress;
+    }
+
+    public boolean nextKeyValue() throws IOException
+    {
+        if (!rowIterator.hasNext())
+        {
+            logger.debug("Finished scanning " + rowIterator.totalRead + " rows (estimate was: " + totalRowCount + ")");
+            return false;
+        }
+
+        try
+        {
+            currentRow = rowIterator.next();
+        }
+        catch (Exception e)
+        {
+            // throw it as IOException, so client can catch it and handle it at client side
+            IOException ioe = new IOException(e.getMessage());
+            ioe.initCause(ioe.getCause());
+            throw ioe;
+        }
+        return true;
+    }
+
+    // we don't use endpointsnitch since we are trying to support hadoop nodes that are
+    // not necessarily on Cassandra machines, too.  This should be adequate for single-DC clusters, at least.
+    private String getLocation()
+    {
+        Collection<InetAddress> localAddresses = FBUtilities.getAllLocalAddresses();
+
+        for (InetAddress address : localAddresses)
+        {
+            for (String location : split.getLocations())
+            {
+                InetAddress locationAddress;
+                try
+                {
+                    locationAddress = InetAddress.getByName(location);
+                }
+                catch (UnknownHostException e)
+                {
+                    throw new AssertionError(e);
+                }
+                if (address.equals(locationAddress))
+                {
+                    return location;
+                }
+            }
+        }
+        return split.getLocations()[0];
+    }
+
+    // Because the old Hadoop API wants us to write to the key and value
+    // and the new asks for them, we need to copy the output of the new API
+    // to the old. Thus, expect a small performance hit.
+    // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat
+    // and ColumnFamilyRecordReader don't support them, it should be fine for now.
+    public boolean next(Map<String, ByteBuffer> keys, Map<String, ByteBuffer> value) throws IOException
+    {
+        if (nextKeyValue())
+        {
+            value.clear();
+            value.putAll(getCurrentValue());
+            
+            keys.clear();
+            keys.putAll(getCurrentKey());
+
+            return true;
+        }
+        return false;
+    }
+
+    public long getPos() throws IOException
+    {
+        return (long) rowIterator.totalRead;
+    }
+
+    public Map<String, ByteBuffer> createKey()
+    {
+        return new LinkedHashMap<String, ByteBuffer>();
+    }
+
+    public Map<String, ByteBuffer> createValue()
+    {
+        return new LinkedHashMap<String, ByteBuffer>();
+    }
+
+    /** CQL row iterator */
+    private class RowIterator extends AbstractIterator<Pair<Map<String, ByteBuffer>, Map<String, ByteBuffer>>>
+    {
+        protected int totalRead = 0;             // total number of cf rows read
+        protected Iterator<CqlRow> rows;
+        private int pageRows = 0;                // the number of cql rows read of this page
+        private String previousRowKey = null;    // previous CF row key
+        private String partitionKeyString;       // keys in <key1>, <key2>, <key3> string format
+        private String partitionKeyMarkers;      // question marks in ? , ? , ? format which matches the number of keys
+
+        public RowIterator()
+        {
+            // initial page
+            executeQuery();
+        }
+
+        protected Pair<Map<String, ByteBuffer>, Map<String, ByteBuffer>> computeNext()
+        {
+            if (rows == null)
+                return endOfData();
+
+            int index = -2;
+            //check there are more page to read
+            while (!rows.hasNext())
+            {
+                // no more data
+                if (index == -1 || emptyPartitionKeyValues())
+                {
+                    logger.debug("no more data.");
+                    return endOfData();
+                }
+
+                index = setTailNull(clusterColumns);
+                logger.debug("set tail to null, index: " + index);
+                executeQuery();
+                pageRows = 0;
+
+                if (rows == null || !rows.hasNext() && index < 0)
+                {
+                    logger.debug("no more data.");
+                    return endOfData();
+                }
+            }
+
+            Map<String, ByteBuffer> valueColumns = createValue();
+            Map<String, ByteBuffer> keyColumns = createKey();
+            int i = 0;
+            CqlRow row = rows.next();
+            for (Column column : row.columns)
+            {
+                String columnName = stringValue(ByteBuffer.wrap(column.getName()));
+                logger.debug("column: " + columnName);
+
+                if (i < partitionBoundColumns.size() + clusterColumns.size())
+                    keyColumns.put(stringValue(column.name), column.value);
+                else
+                    valueColumns.put(stringValue(column.name), column.value);
+
+                i++;
+            }
+
+            // increase total CQL row read for this page
+            pageRows++;
+
+            // increase total CF row read
+            if (newRow(keyColumns, previousRowKey))
+                totalRead++;
+
+            // read full page
+            if (pageRows >= pageRowSize || !rows.hasNext())
+            {
+                Iterator<String> newKeys = keyColumns.keySet().iterator();
+                for (BoundColumn column : partitionBoundColumns)
+                    column.value = keyColumns.get(newKeys.next());
+
+                for (BoundColumn column : clusterColumns)
+                    column.value = keyColumns.get(newKeys.next());
+
+                executeQuery();
+                pageRows = 0;
+            }
+
+            return Pair.create(keyColumns, valueColumns);
+        }
+
+        /** check whether start to read a new CF row by comparing the partition keys */
+        private boolean newRow(Map<String, ByteBuffer> keyColumns, String previousRowKey)
+        {
+            if (keyColumns.isEmpty())
+                return false;
+
+            String rowKey = "";
+            if (keyColumns.size() == 1)
+            {
+                rowKey = partitionBoundColumns.get(0).validator.getString(keyColumns.get(partitionBoundColumns.get(0).name));
+            }
+            else
+            {
+                Iterator<ByteBuffer> iter = keyColumns.values().iterator();
+                for (BoundColumn column : partitionBoundColumns)
+                    rowKey = rowKey + column.validator.getString(ByteBufferUtil.clone(iter.next())) + ":";
+            }
+
+            logger.debug("previous RowKey: " + previousRowKey + ", new row key: " + rowKey);
+            if (previousRowKey == null)
+            {
+                this.previousRowKey = rowKey;
+                return true;
+            }
+
+            if (rowKey.equals(previousRowKey))
+                return false;
+
+            this.previousRowKey = rowKey;
+            return true;
+        }
+
+        /** set the last non-null key value to null, and return the previous index */
+        private int setTailNull(List<BoundColumn> values)
+        {
+            if (values.isEmpty())
+                return -1;
+
+            Iterator<BoundColumn> iterator = values.iterator();
+            int previousIndex = -1;
+            BoundColumn current;
+            while (iterator.hasNext())
+            {
+                current = iterator.next();
+                if (current.value == null)
+                {
+                    int index = previousIndex > 0 ? previousIndex : 0;
+                    BoundColumn column = values.get(index);
+                    logger.debug("set key " + column.name + " value to  null");
+                    column.value = null;
+                    return previousIndex - 1;
+                }
+
+                previousIndex++;
+            }
+
+            BoundColumn column = values.get(previousIndex);
+            logger.debug("set key " + column.name + " value to null");
+            column.value = null;
+            return previousIndex - 1;
+        }
+
+        /** compose the prepared query, pair.left is query id, pair.right is query */
+        private Pair<Integer, String> composeQuery(String columns)
+        {
+            Pair<Integer, String> clause = whereClause();
+            if (columns == null)
+            {
+                columns = "*";
+            }
+            else
+            {
+                // add keys in the front in order
+                String partitionKey = keyString(partitionBoundColumns);
+                String clusterKey = keyString(clusterColumns);
+
+                columns = withoutKeyColumns(columns);
+                columns = (clusterKey == null || "".equals(clusterKey))
+                        ? partitionKey + "," + columns
+                        : partitionKey + "," + clusterKey + "," + columns;
+            }
+
+            return Pair.create(clause.left,
+                               "SELECT " + columns
+                               + " FROM " + cfName
+                               + clause.right
+                               + (userDefinedWhereClauses == null ? "" : " AND " + userDefinedWhereClauses)
+                               + " LIMIT " + pageRowSize
+                               + " ALLOW FILTERING");
+        }
+
+
+        /** remove key columns from the column string */
+        private String withoutKeyColumns(String columnString)
+        {
+            Set<String> keyNames = new HashSet<String>();
+            for (BoundColumn column : Iterables.concat(partitionBoundColumns, clusterColumns))
+                keyNames.add(column.name);
+
+            String[] columns = columnString.split(",");
+            String result = null;
+            for (String column : columns)
+            {
+                String trimmed = column.trim();
+                if (keyNames.contains(trimmed))
+                    continue;
+
+                result = result == null ? trimmed : result + "," + trimmed;
+            }
+            return result;
+        }
+
+        /** compose the where clause */
+        private Pair<Integer, String> whereClause()
+        {
+            if (partitionKeyString == null)
+                partitionKeyString = keyString(partitionBoundColumns);
+
+            if (partitionKeyMarkers == null)
+                partitionKeyMarkers = partitionKeyMarkers();
+            // initial query token(k) >= start_token and token(k) <= end_token
+            if (emptyPartitionKeyValues())
+                return Pair.create(0, " WHERE token(" + partitionKeyString + ") > ? AND token(" + partitionKeyString + ") <= ?");
+
+            // query token(k) > token(pre_partition_key) and token(k) <= end_token
+            if (clusterColumns.size() == 0 || clusterColumns.get(0).value == null)
+                return Pair.create(1,
+                                   " WHERE token(" + partitionKeyString + ") > token(" + partitionKeyMarkers + ") "
+                                   + " AND token(" + partitionKeyString + ") <= ?");
+
+            // query token(k) = token(pre_partition_key) and m = pre_cluster_key_m and n > pre_cluster_key_n
+            Pair<Integer, String> clause = whereClause(clusterColumns, 0);
+            return Pair.create(clause.left,
+                               " WHERE token(" + partitionKeyString + ") = token(" + partitionKeyMarkers + ") " + clause.right);
+        }
+
+        /** recursively compose the where clause */
+        private Pair<Integer, String> whereClause(List<BoundColumn> column, int position)
+        {
+            if (position == column.size() - 1 || column.get(position + 1).value == null)
+                return Pair.create(position + 2, " AND " + column.get(position).name + " > ? ");
+
+            Pair<Integer, String> clause = whereClause(column, position + 1);
+            return Pair.create(clause.left, " AND " + column.get(position).name + " = ? " + clause.right);
+        }
+
+        /** check whether all key values are null */
+        private boolean emptyPartitionKeyValues()
+        {
+            for (BoundColumn column : partitionBoundColumns)
+            {
+                if (column.value != null)
+                    return false;
+            }
+            return true;
+        }
+
+        /** compose the partition key string in format of <key1>, <key2>, <key3> */
+        private String keyString(List<BoundColumn> columns)
+        {
+            String result = null;
+            for (BoundColumn column : columns)
+                result = result == null ? column.name : result + "," + column.name;
+
+            return result == null ? "" : result;
+        }
+
+        /** compose the question marks for partition key string in format of ?, ? , ? */
+        private String partitionKeyMarkers()
+        {
+            String result = null;
+            for (BoundColumn column : partitionBoundColumns)
+                result = result == null ? "?" : result + ",?";
+
+            return result;
+        }
+
+        /** compose the query binding variables, pair.left is query id, pair.right is the binding variables */
+        private Pair<Integer, List<ByteBuffer>> preparedQueryBindValues()
+        {
+            List<ByteBuffer> values = new LinkedList<ByteBuffer>();
+
+            // initial query token(k) >= start_token and token(k) <= end_token
+            if (emptyPartitionKeyValues())
+            {
+                values.add(partitioner.getTokenValidator().fromString(split.getStartToken()));
+                values.add(partitioner.getTokenValidator().fromString(split.getEndToken()));
+                return Pair.create(0, values);
+            }
+            else
+            {
+                for (BoundColumn partitionBoundColumn1 : partitionBoundColumns)
+                    values.add(partitionBoundColumn1.value);
+
+                if (clusterColumns.size() == 0 || clusterColumns.get(0).value == null)
+                {
+                    // query token(k) > token(pre_partition_key) and token(k) <= end_token
+                    values.add(partitioner.getTokenValidator().fromString(split.getEndToken()));
+                    return Pair.create(1, values);
+                }
+                else
+                {
+                    // query token(k) = token(pre_partition_key) and m = pre_cluster_key_m and n > pre_cluster_key_n
+                    int type = preparedQueryBindValues(clusterColumns, 0, values);
+                    return Pair.create(type, values);
+                }
+            }
+        }
+
+        /** recursively compose the query binding variables */
+        private int preparedQueryBindValues(List<BoundColumn> column, int position, List<ByteBuffer> bindValues)
+        {
+            if (position == column.size() - 1 || column.get(position + 1).value == null)
+            {
+                bindValues.add(column.get(position).value);
+                return position + 2;
+            }
+            else
+            {
+                bindValues.add(column.get(position).value);
+                return preparedQueryBindValues(column, position + 1, bindValues);
+            }
+        }
+
+        /**  get the prepared query item Id  */
+        private int prepareQuery(int type) throws InvalidRequestException, TException
+        {
+            Integer itemId = preparedQueryIds.get(type);
+            if (itemId != null)
+                return itemId;
+
+            Pair<Integer, String> query = null;
+            query = composeQuery(columns);
+            logger.debug("type:" + query.left + ", query: " + query.right);
+            CqlPreparedResult cqlPreparedResult = client.prepare_cql3_query(ByteBufferUtil.bytes(query.right), Compression.NONE);
+            preparedQueryIds.put(query.left, cqlPreparedResult.itemId);
+            return cqlPreparedResult.itemId;
+        }
+
+        /** execute the prepared query */
+        private void executeQuery()
+        {
+            Pair<Integer, List<ByteBuffer>> bindValues = preparedQueryBindValues();
+            logger.debug("query type: " + bindValues.left);
+
+            // check whether it reach end of range for type 1 query CASSANDRA-5573
+            if (bindValues.left == 1 && reachEndRange())
+            {
+                rows = null;
+                return;
+            }
+
+            int retries = 0;
+            // only try three times for TimedOutException and UnavailableException
+            while (retries < 3)
+            {
+                try
+                {
+                    CqlResult cqlResult = client.execute_prepared_cql3_query(prepareQuery(bindValues.left), bindValues.right, consistencyLevel);
+                    if (cqlResult != null && cqlResult.rows != null)
+                        rows = cqlResult.rows.iterator();
+                    return;
+                }
+                catch (TimedOutException e)
+                {
+                    retries++;
+                    if (retries >= 3)
+                    {
+                        rows = null;
+                        RuntimeException rte = new RuntimeException(e.getMessage());
+                        rte.initCause(e);
+                        throw rte;
+                    }
+                }
+                catch (UnavailableException e)
+                {
+                    retries++;
+                    if (retries >= 3)
+                    {
+                        rows = null;
+                        RuntimeException rte = new RuntimeException(e.getMessage());
+                        rte.initCause(e);
+                        throw rte;
+                    }
+                }
+                catch (Exception e)
+                {
+                    rows = null;
+                    RuntimeException rte = new RuntimeException(e.getMessage());
+                    rte.initCause(e);
+                    throw rte;
+                }
+            }
+        }
+    }
+
+    /** retrieve the partition keys and cluster keys from system.schema_columnfamilies table */
+    private void retrieveKeys() throws Exception
+    {
+        String query = "select key_aliases," +
+                       "column_aliases, " +
+                       "key_validator, " +
+                       "comparator " +
+                       "from system.schema_columnfamilies " +
+                       "where keyspace_name='%s' and columnfamily_name='%s'";
+        String formatted = String.format(query, keyspace, cfName);
+        CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(formatted), Compression.NONE, ConsistencyLevel.ONE);
+
+        CqlRow cqlRow = result.rows.get(0);
+        String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
+        logger.debug("partition keys: " + keyString);
+        List<String> keys = FBUtilities.fromJsonList(keyString);
+
+        for (String key : keys)
+            partitionBoundColumns.add(new BoundColumn(key));
+
+        keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
+        logger.debug("cluster columns: " + keyString);
+        keys = FBUtilities.fromJsonList(keyString);
+
+        for (String key : keys)
+            clusterColumns.add(new BoundColumn(key));
+
+        Column rawKeyValidator = cqlRow.columns.get(2);
+        String validator = ByteBufferUtil.string(ByteBuffer.wrap(rawKeyValidator.getValue()));
+        logger.debug("row key validator: " + validator);
+        keyValidator = parseType(validator);
+
+        if (keyValidator instanceof CompositeType)
+        {
+            List<AbstractType<?>> types = ((CompositeType) keyValidator).types;
+            for (int i = 0; i < partitionBoundColumns.size(); i++)
+                partitionBoundColumns.get(i).validator = types.get(i);
+        }
+        else
+        {
+            partitionBoundColumns.get(0).validator = keyValidator;
+        }
+    }
+
+    /** check whether current row is at the end of range */
+    private boolean reachEndRange()
+    {
+        // current row key
+        ByteBuffer rowKey;
+        if (keyValidator instanceof CompositeType)
+        {
+            ByteBuffer[] keys = new ByteBuffer[partitionBoundColumns.size()];
+            for (int i = 0; i < partitionBoundColumns.size(); i++)
+                keys[i] = partitionBoundColumns.get(i).value.duplicate();
+
+            rowKey = ((CompositeType) keyValidator).build(keys);
+        }
+        else
+        {
+            rowKey = partitionBoundColumns.get(0).value;
+        }
+
+        String endToken = split.getEndToken();
+        String currentToken = partitioner.getToken(rowKey).toString();
+        logger.debug("End token: " + endToken + ", current token: " + currentToken);
+
+        return endToken.equals(currentToken);
+    }
+
+    private static AbstractType<?> parseType(String type) throws IOException
+    {
+        try
+        {
+            // always treat counters like longs, specifically CCT.compose is not what we need
+            if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
+                return LongType.instance;
+            return TypeParser.parse(type);
+        }
+        catch (ConfigurationException e)
+        {
+            throw new IOException(e);
+        }
+        catch (SyntaxException e)
+        {
+            throw new IOException(e);
+        }
+    }
+
+    private class BoundColumn
+    {
+        final String name;
+        ByteBuffer value;
+        AbstractType<?> validator;
+
+        public BoundColumn(String name)
+        {
+            this.name = name;
+        }
+    }
+    
+    /** get string from a ByteBuffer, catch the exception and throw it as runtime exception*/
+    private static String stringValue(ByteBuffer value)
+    {
+        try
+        {
+            return ByteBufferUtil.string(value);
+        }
+        catch (CharacterCodingException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}