You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/18 00:42:31 UTC

[1/3] git commit: update CqlRecordWriter interface patch by Alex Liu and jbellis for CASSANDRA-5622

Updated Branches:
  refs/heads/cassandra-1.2 0bff5f57d -> f1004e9b1
  refs/heads/trunk 8bf6e1559 -> 670954cb3


update CqlRecordWriter interface
patch by Alex Liu and jbellis for CASSANDRA-5622


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1004e9b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1004e9b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1004e9b

Branch: refs/heads/cassandra-1.2
Commit: f1004e9b175d6593b816e72058b2dbea14257a43
Parents: 0bff5f5
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Jun 17 17:41:58 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Jun 17 17:41:58 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 .../hadoop_cql3_word_count/src/WordCount.java   |   9 +-
 .../AbstractColumnFamilyRecordWriter.java       |   4 +-
 .../hadoop/ColumnFamilyRecordWriter.java        |   2 +-
 .../cassandra/hadoop/cql3/CqlRecordWriter.java  | 119 ++++++++++---------
 5 files changed, 67 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c28d4d7..2bba0ee 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,7 +1,7 @@
 1.2.6
  * Reduce SSTableLoader memory usage (CASSANDRA-5555)
  * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
- * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421)
+ * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)
  * (Hadoop) Fix InputKeyRange in CFIF (CASSANDRA-5536)
  * Fix dealing with ridiculously large max sstable sizes in LCS (CASSANDRA-5589)
  * Ignore pre-truncate hints (CASSANDRA-4655)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/examples/hadoop_cql3_word_count/src/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCount.java b/examples/hadoop_cql3_word_count/src/WordCount.java
index 611f9c2..c92f047 100644
--- a/examples/hadoop_cql3_word_count/src/WordCount.java
+++ b/examples/hadoop_cql3_word_count/src/WordCount.java
@@ -166,9 +166,7 @@ public class WordCount extends Configured implements Tool
         private List<ByteBuffer> getBindVariables(Text word, int sum)
         {
             List<ByteBuffer> variables = new ArrayList<ByteBuffer>();
-            variables.add(keys.get("row_id1"));
-            variables.add(keys.get("row_id2"));
-            variables.add(ByteBufferUtil.bytes(word.toString()));
+            keys.put("word", ByteBufferUtil.bytes(word.toString()));
             variables.add(ByteBufferUtil.bytes(String.valueOf(sum)));         
             return variables;
         }
@@ -210,9 +208,8 @@ public class WordCount extends Configured implements Tool
 
             ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
             job.getConfiguration().set(PRIMARY_KEY, "word,sum");
-            String query = "INSERT INTO " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY +
-                           " (row_id1, row_id2, word, count_num) " +
-                           " values (?, ?, ?, ?)";
+            String query = "UPDATE " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY +
+                           " SET count_num = ? ";
             CqlConfigHelper.setOutputCql(job.getConfiguration(), query);
             ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
             ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
index 6428db3..456130d 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
@@ -109,7 +109,7 @@ public abstract class AbstractColumnFamilyRecordWriter<K, Y> extends RecordWrite
         // The list of endpoints for this range
         protected final List<InetAddress> endpoints;
         // A bounded queue of incoming mutations for this range
-        protected final BlockingQueue<Pair<ByteBuffer, K>> queue = new ArrayBlockingQueue<Pair<ByteBuffer, K>>(queueSize);
+        protected final BlockingQueue<K> queue = new ArrayBlockingQueue<K>(queueSize);
 
         protected volatile boolean run = true;
         // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
@@ -132,7 +132,7 @@ public abstract class AbstractColumnFamilyRecordWriter<K, Y> extends RecordWrite
         /**
          * enqueues the given value to Cassandra
          */
-        public void put(Pair<ByteBuffer, K> value) throws IOException
+        public void put(K value) throws IOException
         {
             while (true)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
index 50ec059..6823342 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
@@ -134,7 +134,7 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
      * A client that runs in a threadpool and connects to the list of endpoints for a particular
      * range. Mutations for keys in that range are sent to this client via a queue.
      */
-    public class RangeClient extends AbstractRangeClient<Mutation>
+    public class RangeClient extends AbstractRangeClient<Pair<ByteBuffer, Mutation>>
     {
         public final String columnFamily = ConfigHelper.getOutputColumnFamily(conf);
         

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index dde6b1f..642d8c4 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -20,13 +20,12 @@ package org.apache.cassandra.hadoop.cql3;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.cassandra.thrift.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.marshal.LongType;
@@ -38,15 +37,13 @@ import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.hadoop.AbstractColumnFamilyRecordWriter;
 import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.hadoop.Progressable;
+import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * The <code>ColumnFamilyRecordWriter</code> maps the output &lt;key, value&gt;
@@ -75,7 +72,8 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
     private final String cql;
 
     private AbstractType<?> keyValidator;
-    private String [] partitionkeys;
+    private String [] partitionKeyColumns;
+    private List<String> clusterColumns;
 
     /**
      * Upon construction, obtain the map that this writer will use to collect
@@ -96,30 +94,30 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
         this.progressable = progressable;
     }
 
-    CqlRecordWriter(Configuration conf) throws IOException
+    CqlRecordWriter(Configuration conf)
     {
         super(conf);
         this.clients = new HashMap<Range, RangeClient>();
-        cql = CqlConfigHelper.getOutputCql(conf);
 
         try
         {
-            String host = getAnyHost();
-            int port = ConfigHelper.getOutputRpcPort(conf);
-            Cassandra.Client client = CqlOutputFormat.createAuthenticatedClient(host, port, conf);
+            Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf);
             retrievePartitionKeyValidator(client);
-            
+            String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
+            if (cqlQuery.toLowerCase().startsWith("insert"))
+                throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
+            cql = appendKeyWhereClauses(cqlQuery);
+
             if (client != null)
             {
                 TTransport transport = client.getOutputProtocol().getTransport();
                 if (transport.isOpen())
                     transport.close();
-                client = null;
             }
         }
         catch (Exception e)
         {
-            throw new IOException(e);
+            throw new RuntimeException(e);
         }
     }
     
@@ -161,8 +159,7 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
     @Override
     public void write(Map<String, ByteBuffer> keyColumns, List<ByteBuffer> values) throws IOException
     {
-        ByteBuffer rowKey = getRowKey(keyColumns);
-        Range<Token> range = ringCache.getRange(rowKey);
+        Range<Token> range = ringCache.getRange(getPartitionKey(keyColumns));
 
         // get the client for the given range, or create a new one
         RangeClient client = clients.get(range);
@@ -174,7 +171,14 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
             clients.put(range, client);
         }
 
-        client.put(Pair.create(rowKey, values));
+        // add primary key columns to the bind variables
+        List<ByteBuffer> allValues = new ArrayList<ByteBuffer>(values);
+        for (String column : partitionKeyColumns)
+            allValues.add(keyColumns.get(column));
+        for (String column : clusterColumns)
+            allValues.add(keyColumns.get(column));
+
+        client.put(allValues);
         progressable.progress();
     }
 
@@ -201,10 +205,10 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
             outer:
             while (run || !queue.isEmpty())
             {
-                Pair<ByteBuffer, List<ByteBuffer>> item;
+                List<ByteBuffer> bindVariables;
                 try
                 {
-                    item = queue.take();
+                    bindVariables = queue.take();
                 }
                 catch (InterruptedException e)
                 {
@@ -220,16 +224,15 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
                     {
                         int i = 0;
                         int itemId = preparedStatement(client);
-                        while (item != null)
+                        while (bindVariables != null)
                         {
-                            List<ByteBuffer> bindVariables = item.right;
                             client.execute_prepared_cql3_query(itemId, bindVariables, ConsistencyLevel.ONE);
                             i++;
                             
                             if (i >= batchThreshold)
                                 break;
                             
-                            item = queue.poll();
+                            bindVariables = queue.poll();
                         }
                         
                         break;
@@ -294,23 +297,22 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
         }
     }
 
-    private ByteBuffer getRowKey(Map<String, ByteBuffer> keyColumns)
+    private ByteBuffer getPartitionKey(Map<String, ByteBuffer> keyColumns)
     {
-        //current row key
-        ByteBuffer rowKey;
+        ByteBuffer partitionKey;
         if (keyValidator instanceof CompositeType)
         {
-            ByteBuffer[] keys = new ByteBuffer[partitionkeys.length];
+            ByteBuffer[] keys = new ByteBuffer[partitionKeyColumns.length];
             for (int i = 0; i< keys.length; i++)
-                keys[i] = keyColumns.get(partitionkeys[i]);
+                keys[i] = keyColumns.get(partitionKeyColumns[i]);
 
-            rowKey = ((CompositeType) keyValidator).build(keys);
+            partitionKey = ((CompositeType) keyValidator).build(keys);
         }
         else
         {
-            rowKey = keyColumns.get(partitionkeys[0]);
+            partitionKey = keyColumns.get(partitionKeyColumns[0]);
         }
-        return rowKey;
+        return partitionKey;
     }
 
     /** retrieve the key validator from system.schema_columnfamilies table */
@@ -319,7 +321,8 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
         String keyspace = ConfigHelper.getOutputKeyspace(conf);
         String cfName = ConfigHelper.getOutputColumnFamily(conf);
         String query = "SELECT key_validator," +
-        		       "       key_aliases " +
+        		       "       key_aliases," +
+        		       "       column_aliases " +
                        "FROM system.schema_columnfamilies " +
                        "WHERE keyspace_name='%s' and columnfamily_name='%s'";
         String formatted = String.format(query, keyspace, cfName);
@@ -334,16 +337,22 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
         logger.debug("partition keys: " + keyString);
 
         List<String> keys = FBUtilities.fromJsonList(keyString);
-        partitionkeys = new String[keys.size()];
+        partitionKeyColumns = new String[keys.size()];
         int i = 0;
         for (String key : keys)
         {
-            partitionkeys[i] = key;
+            partitionKeyColumns[i] = key;
             i++;
         }
+
+        Column rawClusterColumns = result.rows.get(0).columns.get(2);
+        String clusterColumnString = ByteBufferUtil.string(ByteBuffer.wrap(rawClusterColumns.getValue()));
+
+        logger.debug("cluster columns: " + clusterColumnString);
+        clusterColumns = FBUtilities.fromJsonList(clusterColumnString);
     }
 
-    private AbstractType<?> parseType(String type) throws IOException
+    private AbstractType<?> parseType(String type) throws ConfigurationException
     {
         try
         {
@@ -352,32 +361,24 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
                 return LongType.instance;
             return TypeParser.parse(type);
         }
-        catch (ConfigurationException e)
-        {
-            throw new IOException(e);
-        }
         catch (SyntaxException e)
         {
-            throw new IOException(e);
+            throw new ConfigurationException(e.getMessage(), e);
         }
     }
-    
-    private String getAnyHost() throws IOException, InvalidRequestException, TException
+
+    /**
+     * add where clauses for partition keys and cluster columns
+     */
+    private String appendKeyWhereClauses(String cqlQuery)
     {
-        Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf);
-        List<TokenRange> ring = client.describe_ring(ConfigHelper.getOutputKeyspace(conf));
-        try
-        {
-            for (TokenRange range : ring)
-                return range.endpoints.get(0);
-        }
-        finally
-        {
-            TTransport transport = client.getOutputProtocol().getTransport();
-            if (transport.isOpen())
-                transport.close();
-        }
-        throw new IOException("There are no endpoints");
-    }
+        String keyWhereClause = "";
+
+        for (String partitionKey : partitionKeyColumns)
+            keyWhereClause += String.format("%s = ?", keyWhereClause.isEmpty() ? partitionKey : (" AND " + partitionKey));
+        for (String clusterColumn : clusterColumns)
+            keyWhereClause += " AND " + clusterColumn + " = ?";
 
+        return cqlQuery + " WHERE " + keyWhereClause;
+    }
 }


[3/3] git commit: merge from 1.2

Posted by jb...@apache.org.
merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/670954cb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/670954cb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/670954cb

Branch: refs/heads/trunk
Commit: 670954cb3e6a1021c03884ffc4756caab0fba576
Parents: 8bf6e15 f1004e9
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Jun 17 17:42:26 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Jun 17 17:42:26 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../hadoop_cql3_word_count/src/WordCount.java   |   9 +-
 .../AbstractColumnFamilyRecordWriter.java       |   4 +-
 .../hadoop/ColumnFamilyRecordWriter.java        |   2 +-
 .../cassandra/hadoop/cql3/CqlRecordWriter.java  | 119 ++++++++++---------
 5 files changed, 67 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/670954cb/CHANGES.txt
----------------------------------------------------------------------


[2/3] git commit: update CqlRecordWriter interface patch by Alex Liu and jbellis for CASSANDRA-5622

Posted by jb...@apache.org.
update CqlRecordWriter interface
patch by Alex Liu and jbellis for CASSANDRA-5622


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1004e9b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1004e9b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1004e9b

Branch: refs/heads/trunk
Commit: f1004e9b175d6593b816e72058b2dbea14257a43
Parents: 0bff5f5
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Jun 17 17:41:58 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Jun 17 17:41:58 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 .../hadoop_cql3_word_count/src/WordCount.java   |   9 +-
 .../AbstractColumnFamilyRecordWriter.java       |   4 +-
 .../hadoop/ColumnFamilyRecordWriter.java        |   2 +-
 .../cassandra/hadoop/cql3/CqlRecordWriter.java  | 119 ++++++++++---------
 5 files changed, 67 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c28d4d7..2bba0ee 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,7 +1,7 @@
 1.2.6
  * Reduce SSTableLoader memory usage (CASSANDRA-5555)
  * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
- * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421)
+ * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)
  * (Hadoop) Fix InputKeyRange in CFIF (CASSANDRA-5536)
  * Fix dealing with ridiculously large max sstable sizes in LCS (CASSANDRA-5589)
  * Ignore pre-truncate hints (CASSANDRA-4655)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/examples/hadoop_cql3_word_count/src/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCount.java b/examples/hadoop_cql3_word_count/src/WordCount.java
index 611f9c2..c92f047 100644
--- a/examples/hadoop_cql3_word_count/src/WordCount.java
+++ b/examples/hadoop_cql3_word_count/src/WordCount.java
@@ -166,9 +166,7 @@ public class WordCount extends Configured implements Tool
         private List<ByteBuffer> getBindVariables(Text word, int sum)
         {
             List<ByteBuffer> variables = new ArrayList<ByteBuffer>();
-            variables.add(keys.get("row_id1"));
-            variables.add(keys.get("row_id2"));
-            variables.add(ByteBufferUtil.bytes(word.toString()));
+            keys.put("word", ByteBufferUtil.bytes(word.toString()));
             variables.add(ByteBufferUtil.bytes(String.valueOf(sum)));         
             return variables;
         }
@@ -210,9 +208,8 @@ public class WordCount extends Configured implements Tool
 
             ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
             job.getConfiguration().set(PRIMARY_KEY, "word,sum");
-            String query = "INSERT INTO " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY +
-                           " (row_id1, row_id2, word, count_num) " +
-                           " values (?, ?, ?, ?)";
+            String query = "UPDATE " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY +
+                           " SET count_num = ? ";
             CqlConfigHelper.setOutputCql(job.getConfiguration(), query);
             ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
             ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
index 6428db3..456130d 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
@@ -109,7 +109,7 @@ public abstract class AbstractColumnFamilyRecordWriter<K, Y> extends RecordWrite
         // The list of endpoints for this range
         protected final List<InetAddress> endpoints;
         // A bounded queue of incoming mutations for this range
-        protected final BlockingQueue<Pair<ByteBuffer, K>> queue = new ArrayBlockingQueue<Pair<ByteBuffer, K>>(queueSize);
+        protected final BlockingQueue<K> queue = new ArrayBlockingQueue<K>(queueSize);
 
         protected volatile boolean run = true;
         // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
@@ -132,7 +132,7 @@ public abstract class AbstractColumnFamilyRecordWriter<K, Y> extends RecordWrite
         /**
          * enqueues the given value to Cassandra
          */
-        public void put(Pair<ByteBuffer, K> value) throws IOException
+        public void put(K value) throws IOException
         {
             while (true)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
index 50ec059..6823342 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
@@ -134,7 +134,7 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
      * A client that runs in a threadpool and connects to the list of endpoints for a particular
      * range. Mutations for keys in that range are sent to this client via a queue.
      */
-    public class RangeClient extends AbstractRangeClient<Mutation>
+    public class RangeClient extends AbstractRangeClient<Pair<ByteBuffer, Mutation>>
     {
         public final String columnFamily = ConfigHelper.getOutputColumnFamily(conf);
         

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index dde6b1f..642d8c4 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -20,13 +20,12 @@ package org.apache.cassandra.hadoop.cql3;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.cassandra.thrift.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.marshal.LongType;
@@ -38,15 +37,13 @@ import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.hadoop.AbstractColumnFamilyRecordWriter;
 import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.hadoop.Progressable;
+import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * The <code>ColumnFamilyRecordWriter</code> maps the output &lt;key, value&gt;
@@ -75,7 +72,8 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
     private final String cql;
 
     private AbstractType<?> keyValidator;
-    private String [] partitionkeys;
+    private String [] partitionKeyColumns;
+    private List<String> clusterColumns;
 
     /**
      * Upon construction, obtain the map that this writer will use to collect
@@ -96,30 +94,30 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
         this.progressable = progressable;
     }
 
-    CqlRecordWriter(Configuration conf) throws IOException
+    CqlRecordWriter(Configuration conf)
     {
         super(conf);
         this.clients = new HashMap<Range, RangeClient>();
-        cql = CqlConfigHelper.getOutputCql(conf);
 
         try
         {
-            String host = getAnyHost();
-            int port = ConfigHelper.getOutputRpcPort(conf);
-            Cassandra.Client client = CqlOutputFormat.createAuthenticatedClient(host, port, conf);
+            Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf);
             retrievePartitionKeyValidator(client);
-            
+            String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
+            if (cqlQuery.toLowerCase().startsWith("insert"))
+                throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
+            cql = appendKeyWhereClauses(cqlQuery);
+
             if (client != null)
             {
                 TTransport transport = client.getOutputProtocol().getTransport();
                 if (transport.isOpen())
                     transport.close();
-                client = null;
             }
         }
         catch (Exception e)
         {
-            throw new IOException(e);
+            throw new RuntimeException(e);
         }
     }
     
@@ -161,8 +159,7 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
     @Override
     public void write(Map<String, ByteBuffer> keyColumns, List<ByteBuffer> values) throws IOException
     {
-        ByteBuffer rowKey = getRowKey(keyColumns);
-        Range<Token> range = ringCache.getRange(rowKey);
+        Range<Token> range = ringCache.getRange(getPartitionKey(keyColumns));
 
         // get the client for the given range, or create a new one
         RangeClient client = clients.get(range);
@@ -174,7 +171,14 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
             clients.put(range, client);
         }
 
-        client.put(Pair.create(rowKey, values));
+        // add primary key columns to the bind variables
+        List<ByteBuffer> allValues = new ArrayList<ByteBuffer>(values);
+        for (String column : partitionKeyColumns)
+            allValues.add(keyColumns.get(column));
+        for (String column : clusterColumns)
+            allValues.add(keyColumns.get(column));
+
+        client.put(allValues);
         progressable.progress();
     }
 
@@ -201,10 +205,10 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
             outer:
             while (run || !queue.isEmpty())
             {
-                Pair<ByteBuffer, List<ByteBuffer>> item;
+                List<ByteBuffer> bindVariables;
                 try
                 {
-                    item = queue.take();
+                    bindVariables = queue.take();
                 }
                 catch (InterruptedException e)
                 {
@@ -220,16 +224,15 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
                     {
                         int i = 0;
                         int itemId = preparedStatement(client);
-                        while (item != null)
+                        while (bindVariables != null)
                         {
-                            List<ByteBuffer> bindVariables = item.right;
                             client.execute_prepared_cql3_query(itemId, bindVariables, ConsistencyLevel.ONE);
                             i++;
                             
                             if (i >= batchThreshold)
                                 break;
                             
-                            item = queue.poll();
+                            bindVariables = queue.poll();
                         }
                         
                         break;
@@ -294,23 +297,22 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
         }
     }
 
-    private ByteBuffer getRowKey(Map<String, ByteBuffer> keyColumns)
+    private ByteBuffer getPartitionKey(Map<String, ByteBuffer> keyColumns)
     {
-        //current row key
-        ByteBuffer rowKey;
+        ByteBuffer partitionKey;
         if (keyValidator instanceof CompositeType)
         {
-            ByteBuffer[] keys = new ByteBuffer[partitionkeys.length];
+            ByteBuffer[] keys = new ByteBuffer[partitionKeyColumns.length];
             for (int i = 0; i< keys.length; i++)
-                keys[i] = keyColumns.get(partitionkeys[i]);
+                keys[i] = keyColumns.get(partitionKeyColumns[i]);
 
-            rowKey = ((CompositeType) keyValidator).build(keys);
+            partitionKey = ((CompositeType) keyValidator).build(keys);
         }
         else
         {
-            rowKey = keyColumns.get(partitionkeys[0]);
+            partitionKey = keyColumns.get(partitionKeyColumns[0]);
         }
-        return rowKey;
+        return partitionKey;
     }
 
     /** retrieve the key validator from system.schema_columnfamilies table */
@@ -319,7 +321,8 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
         String keyspace = ConfigHelper.getOutputKeyspace(conf);
         String cfName = ConfigHelper.getOutputColumnFamily(conf);
         String query = "SELECT key_validator," +
-        		       "       key_aliases " +
+        		       "       key_aliases," +
+        		       "       column_aliases " +
                        "FROM system.schema_columnfamilies " +
                        "WHERE keyspace_name='%s' and columnfamily_name='%s'";
         String formatted = String.format(query, keyspace, cfName);
@@ -334,16 +337,22 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
         logger.debug("partition keys: " + keyString);
 
         List<String> keys = FBUtilities.fromJsonList(keyString);
-        partitionkeys = new String[keys.size()];
+        partitionKeyColumns = new String[keys.size()];
         int i = 0;
         for (String key : keys)
         {
-            partitionkeys[i] = key;
+            partitionKeyColumns[i] = key;
             i++;
         }
+
+        Column rawClusterColumns = result.rows.get(0).columns.get(2);
+        String clusterColumnString = ByteBufferUtil.string(ByteBuffer.wrap(rawClusterColumns.getValue()));
+
+        logger.debug("cluster columns: " + clusterColumnString);
+        clusterColumns = FBUtilities.fromJsonList(clusterColumnString);
     }
 
-    private AbstractType<?> parseType(String type) throws IOException
+    private AbstractType<?> parseType(String type) throws ConfigurationException
     {
         try
         {
@@ -352,32 +361,24 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
                 return LongType.instance;
             return TypeParser.parse(type);
         }
-        catch (ConfigurationException e)
-        {
-            throw new IOException(e);
-        }
         catch (SyntaxException e)
         {
-            throw new IOException(e);
+            throw new ConfigurationException(e.getMessage(), e);
         }
     }
-    
-    private String getAnyHost() throws IOException, InvalidRequestException, TException
+
+    /**
+     * add where clauses for partition keys and cluster columns
+     */
+    private String appendKeyWhereClauses(String cqlQuery)
     {
-        Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf);
-        List<TokenRange> ring = client.describe_ring(ConfigHelper.getOutputKeyspace(conf));
-        try
-        {
-            for (TokenRange range : ring)
-                return range.endpoints.get(0);
-        }
-        finally
-        {
-            TTransport transport = client.getOutputProtocol().getTransport();
-            if (transport.isOpen())
-                transport.close();
-        }
-        throw new IOException("There are no endpoints");
-    }
+        String keyWhereClause = "";
+
+        for (String partitionKey : partitionKeyColumns)
+            keyWhereClause += String.format("%s = ?", keyWhereClause.isEmpty() ? partitionKey : (" AND " + partitionKey));
+        for (String clusterColumn : clusterColumns)
+            keyWhereClause += " AND " + clusterColumn + " = ?";
 
+        return cqlQuery + " WHERE " + keyWhereClause;
+    }
 }