You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/18 00:42:31 UTC
[1/3] git commit: update CqlRecordWriter interface patch by Alex Liu
and jbellis for CASSANDRA-5622
Updated Branches:
refs/heads/cassandra-1.2 0bff5f57d -> f1004e9b1
refs/heads/trunk 8bf6e1559 -> 670954cb3
update CqlRecordWriter interface
patch by Alex Liu and jbellis for CASSANDRA-5622
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1004e9b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1004e9b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1004e9b
Branch: refs/heads/cassandra-1.2
Commit: f1004e9b175d6593b816e72058b2dbea14257a43
Parents: 0bff5f5
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Jun 17 17:41:58 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Jun 17 17:41:58 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../hadoop_cql3_word_count/src/WordCount.java | 9 +-
.../AbstractColumnFamilyRecordWriter.java | 4 +-
.../hadoop/ColumnFamilyRecordWriter.java | 2 +-
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 119 ++++++++++---------
5 files changed, 67 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c28d4d7..2bba0ee 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,7 +1,7 @@
1.2.6
* Reduce SSTableLoader memory usage (CASSANDRA-5555)
* Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
- * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421)
+ * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)
* (Hadoop) Fix InputKeyRange in CFIF (CASSANDRA-5536)
* Fix dealing with ridiculously large max sstable sizes in LCS (CASSANDRA-5589)
* Ignore pre-truncate hints (CASSANDRA-4655)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/examples/hadoop_cql3_word_count/src/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCount.java b/examples/hadoop_cql3_word_count/src/WordCount.java
index 611f9c2..c92f047 100644
--- a/examples/hadoop_cql3_word_count/src/WordCount.java
+++ b/examples/hadoop_cql3_word_count/src/WordCount.java
@@ -166,9 +166,7 @@ public class WordCount extends Configured implements Tool
private List<ByteBuffer> getBindVariables(Text word, int sum)
{
List<ByteBuffer> variables = new ArrayList<ByteBuffer>();
- variables.add(keys.get("row_id1"));
- variables.add(keys.get("row_id2"));
- variables.add(ByteBufferUtil.bytes(word.toString()));
+ keys.put("word", ByteBufferUtil.bytes(word.toString()));
variables.add(ByteBufferUtil.bytes(String.valueOf(sum)));
return variables;
}
@@ -210,9 +208,8 @@ public class WordCount extends Configured implements Tool
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
job.getConfiguration().set(PRIMARY_KEY, "word,sum");
- String query = "INSERT INTO " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY +
- " (row_id1, row_id2, word, count_num) " +
- " values (?, ?, ?, ?)";
+ String query = "UPDATE " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY +
+ " SET count_num = ? ";
CqlConfigHelper.setOutputCql(job.getConfiguration(), query);
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
index 6428db3..456130d 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
@@ -109,7 +109,7 @@ public abstract class AbstractColumnFamilyRecordWriter<K, Y> extends RecordWrite
// The list of endpoints for this range
protected final List<InetAddress> endpoints;
// A bounded queue of incoming mutations for this range
- protected final BlockingQueue<Pair<ByteBuffer, K>> queue = new ArrayBlockingQueue<Pair<ByteBuffer, K>>(queueSize);
+ protected final BlockingQueue<K> queue = new ArrayBlockingQueue<K>(queueSize);
protected volatile boolean run = true;
// we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
@@ -132,7 +132,7 @@ public abstract class AbstractColumnFamilyRecordWriter<K, Y> extends RecordWrite
/**
* enqueues the given value to Cassandra
*/
- public void put(Pair<ByteBuffer, K> value) throws IOException
+ public void put(K value) throws IOException
{
while (true)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
index 50ec059..6823342 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
@@ -134,7 +134,7 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
* A client that runs in a threadpool and connects to the list of endpoints for a particular
* range. Mutations for keys in that range are sent to this client via a queue.
*/
- public class RangeClient extends AbstractRangeClient<Mutation>
+ public class RangeClient extends AbstractRangeClient<Pair<ByteBuffer, Mutation>>
{
public final String columnFamily = ConfigHelper.getOutputColumnFamily(conf);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index dde6b1f..642d8c4 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -20,13 +20,12 @@ package org.apache.cassandra.hadoop.cql3;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.cassandra.thrift.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.LongType;
@@ -38,15 +37,13 @@ import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.hadoop.AbstractColumnFamilyRecordWriter;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.hadoop.Progressable;
+import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* The <code>ColumnFamilyRecordWriter</code> maps the output <key, value>
@@ -75,7 +72,8 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
private final String cql;
private AbstractType<?> keyValidator;
- private String [] partitionkeys;
+ private String [] partitionKeyColumns;
+ private List<String> clusterColumns;
/**
* Upon construction, obtain the map that this writer will use to collect
@@ -96,30 +94,30 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
this.progressable = progressable;
}
- CqlRecordWriter(Configuration conf) throws IOException
+ CqlRecordWriter(Configuration conf)
{
super(conf);
this.clients = new HashMap<Range, RangeClient>();
- cql = CqlConfigHelper.getOutputCql(conf);
try
{
- String host = getAnyHost();
- int port = ConfigHelper.getOutputRpcPort(conf);
- Cassandra.Client client = CqlOutputFormat.createAuthenticatedClient(host, port, conf);
+ Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf);
retrievePartitionKeyValidator(client);
-
+ String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
+ if (cqlQuery.toLowerCase().startsWith("insert"))
+ throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
+ cql = appendKeyWhereClauses(cqlQuery);
+
if (client != null)
{
TTransport transport = client.getOutputProtocol().getTransport();
if (transport.isOpen())
transport.close();
- client = null;
}
}
catch (Exception e)
{
- throw new IOException(e);
+ throw new RuntimeException(e);
}
}
@@ -161,8 +159,7 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
@Override
public void write(Map<String, ByteBuffer> keyColumns, List<ByteBuffer> values) throws IOException
{
- ByteBuffer rowKey = getRowKey(keyColumns);
- Range<Token> range = ringCache.getRange(rowKey);
+ Range<Token> range = ringCache.getRange(getPartitionKey(keyColumns));
// get the client for the given range, or create a new one
RangeClient client = clients.get(range);
@@ -174,7 +171,14 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
clients.put(range, client);
}
- client.put(Pair.create(rowKey, values));
+ // add primary key columns to the bind variables
+ List<ByteBuffer> allValues = new ArrayList<ByteBuffer>(values);
+ for (String column : partitionKeyColumns)
+ allValues.add(keyColumns.get(column));
+ for (String column : clusterColumns)
+ allValues.add(keyColumns.get(column));
+
+ client.put(allValues);
progressable.progress();
}
@@ -201,10 +205,10 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
outer:
while (run || !queue.isEmpty())
{
- Pair<ByteBuffer, List<ByteBuffer>> item;
+ List<ByteBuffer> bindVariables;
try
{
- item = queue.take();
+ bindVariables = queue.take();
}
catch (InterruptedException e)
{
@@ -220,16 +224,15 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
{
int i = 0;
int itemId = preparedStatement(client);
- while (item != null)
+ while (bindVariables != null)
{
- List<ByteBuffer> bindVariables = item.right;
client.execute_prepared_cql3_query(itemId, bindVariables, ConsistencyLevel.ONE);
i++;
if (i >= batchThreshold)
break;
- item = queue.poll();
+ bindVariables = queue.poll();
}
break;
@@ -294,23 +297,22 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
}
}
- private ByteBuffer getRowKey(Map<String, ByteBuffer> keyColumns)
+ private ByteBuffer getPartitionKey(Map<String, ByteBuffer> keyColumns)
{
- //current row key
- ByteBuffer rowKey;
+ ByteBuffer partitionKey;
if (keyValidator instanceof CompositeType)
{
- ByteBuffer[] keys = new ByteBuffer[partitionkeys.length];
+ ByteBuffer[] keys = new ByteBuffer[partitionKeyColumns.length];
for (int i = 0; i< keys.length; i++)
- keys[i] = keyColumns.get(partitionkeys[i]);
+ keys[i] = keyColumns.get(partitionKeyColumns[i]);
- rowKey = ((CompositeType) keyValidator).build(keys);
+ partitionKey = ((CompositeType) keyValidator).build(keys);
}
else
{
- rowKey = keyColumns.get(partitionkeys[0]);
+ partitionKey = keyColumns.get(partitionKeyColumns[0]);
}
- return rowKey;
+ return partitionKey;
}
/** retrieve the key validator from system.schema_columnfamilies table */
@@ -319,7 +321,8 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
String keyspace = ConfigHelper.getOutputKeyspace(conf);
String cfName = ConfigHelper.getOutputColumnFamily(conf);
String query = "SELECT key_validator," +
- " key_aliases " +
+ " key_aliases," +
+ " column_aliases " +
"FROM system.schema_columnfamilies " +
"WHERE keyspace_name='%s' and columnfamily_name='%s'";
String formatted = String.format(query, keyspace, cfName);
@@ -334,16 +337,22 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
logger.debug("partition keys: " + keyString);
List<String> keys = FBUtilities.fromJsonList(keyString);
- partitionkeys = new String[keys.size()];
+ partitionKeyColumns = new String[keys.size()];
int i = 0;
for (String key : keys)
{
- partitionkeys[i] = key;
+ partitionKeyColumns[i] = key;
i++;
}
+
+ Column rawClusterColumns = result.rows.get(0).columns.get(2);
+ String clusterColumnString = ByteBufferUtil.string(ByteBuffer.wrap(rawClusterColumns.getValue()));
+
+ logger.debug("cluster columns: " + clusterColumnString);
+ clusterColumns = FBUtilities.fromJsonList(clusterColumnString);
}
- private AbstractType<?> parseType(String type) throws IOException
+ private AbstractType<?> parseType(String type) throws ConfigurationException
{
try
{
@@ -352,32 +361,24 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
return LongType.instance;
return TypeParser.parse(type);
}
- catch (ConfigurationException e)
- {
- throw new IOException(e);
- }
catch (SyntaxException e)
{
- throw new IOException(e);
+ throw new ConfigurationException(e.getMessage(), e);
}
}
-
- private String getAnyHost() throws IOException, InvalidRequestException, TException
+
+ /**
+ * add where clauses for partition keys and cluster columns
+ */
+ private String appendKeyWhereClauses(String cqlQuery)
{
- Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf);
- List<TokenRange> ring = client.describe_ring(ConfigHelper.getOutputKeyspace(conf));
- try
- {
- for (TokenRange range : ring)
- return range.endpoints.get(0);
- }
- finally
- {
- TTransport transport = client.getOutputProtocol().getTransport();
- if (transport.isOpen())
- transport.close();
- }
- throw new IOException("There are no endpoints");
- }
+ String keyWhereClause = "";
+
+ for (String partitionKey : partitionKeyColumns)
+ keyWhereClause += String.format("%s = ?", keyWhereClause.isEmpty() ? partitionKey : (" AND " + partitionKey));
+ for (String clusterColumn : clusterColumns)
+ keyWhereClause += " AND " + clusterColumn + " = ?";
+ return cqlQuery + " WHERE " + keyWhereClause;
+ }
}
[3/3] git commit: merge from 1.2
Posted by jb...@apache.org.
merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/670954cb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/670954cb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/670954cb
Branch: refs/heads/trunk
Commit: 670954cb3e6a1021c03884ffc4756caab0fba576
Parents: 8bf6e15 f1004e9
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Jun 17 17:42:26 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Jun 17 17:42:26 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../hadoop_cql3_word_count/src/WordCount.java | 9 +-
.../AbstractColumnFamilyRecordWriter.java | 4 +-
.../hadoop/ColumnFamilyRecordWriter.java | 2 +-
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 119 ++++++++++---------
5 files changed, 67 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/670954cb/CHANGES.txt
----------------------------------------------------------------------
[2/3] git commit: update CqlRecordWriter interface patch by Alex Liu
and jbellis for CASSANDRA-5622
Posted by jb...@apache.org.
update CqlRecordWriter interface
patch by Alex Liu and jbellis for CASSANDRA-5622
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1004e9b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1004e9b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1004e9b
Branch: refs/heads/trunk
Commit: f1004e9b175d6593b816e72058b2dbea14257a43
Parents: 0bff5f5
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Jun 17 17:41:58 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Jun 17 17:41:58 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../hadoop_cql3_word_count/src/WordCount.java | 9 +-
.../AbstractColumnFamilyRecordWriter.java | 4 +-
.../hadoop/ColumnFamilyRecordWriter.java | 2 +-
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 119 ++++++++++---------
5 files changed, 67 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c28d4d7..2bba0ee 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,7 +1,7 @@
1.2.6
* Reduce SSTableLoader memory usage (CASSANDRA-5555)
* Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
- * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421)
+ * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)
* (Hadoop) Fix InputKeyRange in CFIF (CASSANDRA-5536)
* Fix dealing with ridiculously large max sstable sizes in LCS (CASSANDRA-5589)
* Ignore pre-truncate hints (CASSANDRA-4655)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/examples/hadoop_cql3_word_count/src/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCount.java b/examples/hadoop_cql3_word_count/src/WordCount.java
index 611f9c2..c92f047 100644
--- a/examples/hadoop_cql3_word_count/src/WordCount.java
+++ b/examples/hadoop_cql3_word_count/src/WordCount.java
@@ -166,9 +166,7 @@ public class WordCount extends Configured implements Tool
private List<ByteBuffer> getBindVariables(Text word, int sum)
{
List<ByteBuffer> variables = new ArrayList<ByteBuffer>();
- variables.add(keys.get("row_id1"));
- variables.add(keys.get("row_id2"));
- variables.add(ByteBufferUtil.bytes(word.toString()));
+ keys.put("word", ByteBufferUtil.bytes(word.toString()));
variables.add(ByteBufferUtil.bytes(String.valueOf(sum)));
return variables;
}
@@ -210,9 +208,8 @@ public class WordCount extends Configured implements Tool
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
job.getConfiguration().set(PRIMARY_KEY, "word,sum");
- String query = "INSERT INTO " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY +
- " (row_id1, row_id2, word, count_num) " +
- " values (?, ?, ?, ?)";
+ String query = "UPDATE " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY +
+ " SET count_num = ? ";
CqlConfigHelper.setOutputCql(job.getConfiguration(), query);
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
index 6428db3..456130d 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
@@ -109,7 +109,7 @@ public abstract class AbstractColumnFamilyRecordWriter<K, Y> extends RecordWrite
// The list of endpoints for this range
protected final List<InetAddress> endpoints;
// A bounded queue of incoming mutations for this range
- protected final BlockingQueue<Pair<ByteBuffer, K>> queue = new ArrayBlockingQueue<Pair<ByteBuffer, K>>(queueSize);
+ protected final BlockingQueue<K> queue = new ArrayBlockingQueue<K>(queueSize);
protected volatile boolean run = true;
// we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
@@ -132,7 +132,7 @@ public abstract class AbstractColumnFamilyRecordWriter<K, Y> extends RecordWrite
/**
* enqueues the given value to Cassandra
*/
- public void put(Pair<ByteBuffer, K> value) throws IOException
+ public void put(K value) throws IOException
{
while (true)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
index 50ec059..6823342 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
@@ -134,7 +134,7 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
* A client that runs in a threadpool and connects to the list of endpoints for a particular
* range. Mutations for keys in that range are sent to this client via a queue.
*/
- public class RangeClient extends AbstractRangeClient<Mutation>
+ public class RangeClient extends AbstractRangeClient<Pair<ByteBuffer, Mutation>>
{
public final String columnFamily = ConfigHelper.getOutputColumnFamily(conf);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index dde6b1f..642d8c4 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -20,13 +20,12 @@ package org.apache.cassandra.hadoop.cql3;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.cassandra.thrift.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.LongType;
@@ -38,15 +37,13 @@ import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.hadoop.AbstractColumnFamilyRecordWriter;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.hadoop.Progressable;
+import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* The <code>ColumnFamilyRecordWriter</code> maps the output <key, value>
@@ -75,7 +72,8 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
private final String cql;
private AbstractType<?> keyValidator;
- private String [] partitionkeys;
+ private String [] partitionKeyColumns;
+ private List<String> clusterColumns;
/**
* Upon construction, obtain the map that this writer will use to collect
@@ -96,30 +94,30 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
this.progressable = progressable;
}
- CqlRecordWriter(Configuration conf) throws IOException
+ CqlRecordWriter(Configuration conf)
{
super(conf);
this.clients = new HashMap<Range, RangeClient>();
- cql = CqlConfigHelper.getOutputCql(conf);
try
{
- String host = getAnyHost();
- int port = ConfigHelper.getOutputRpcPort(conf);
- Cassandra.Client client = CqlOutputFormat.createAuthenticatedClient(host, port, conf);
+ Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf);
retrievePartitionKeyValidator(client);
-
+ String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
+ if (cqlQuery.toLowerCase().startsWith("insert"))
+ throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
+ cql = appendKeyWhereClauses(cqlQuery);
+
if (client != null)
{
TTransport transport = client.getOutputProtocol().getTransport();
if (transport.isOpen())
transport.close();
- client = null;
}
}
catch (Exception e)
{
- throw new IOException(e);
+ throw new RuntimeException(e);
}
}
@@ -161,8 +159,7 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
@Override
public void write(Map<String, ByteBuffer> keyColumns, List<ByteBuffer> values) throws IOException
{
- ByteBuffer rowKey = getRowKey(keyColumns);
- Range<Token> range = ringCache.getRange(rowKey);
+ Range<Token> range = ringCache.getRange(getPartitionKey(keyColumns));
// get the client for the given range, or create a new one
RangeClient client = clients.get(range);
@@ -174,7 +171,14 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
clients.put(range, client);
}
- client.put(Pair.create(rowKey, values));
+ // add primary key columns to the bind variables
+ List<ByteBuffer> allValues = new ArrayList<ByteBuffer>(values);
+ for (String column : partitionKeyColumns)
+ allValues.add(keyColumns.get(column));
+ for (String column : clusterColumns)
+ allValues.add(keyColumns.get(column));
+
+ client.put(allValues);
progressable.progress();
}
@@ -201,10 +205,10 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
outer:
while (run || !queue.isEmpty())
{
- Pair<ByteBuffer, List<ByteBuffer>> item;
+ List<ByteBuffer> bindVariables;
try
{
- item = queue.take();
+ bindVariables = queue.take();
}
catch (InterruptedException e)
{
@@ -220,16 +224,15 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
{
int i = 0;
int itemId = preparedStatement(client);
- while (item != null)
+ while (bindVariables != null)
{
- List<ByteBuffer> bindVariables = item.right;
client.execute_prepared_cql3_query(itemId, bindVariables, ConsistencyLevel.ONE);
i++;
if (i >= batchThreshold)
break;
- item = queue.poll();
+ bindVariables = queue.poll();
}
break;
@@ -294,23 +297,22 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
}
}
- private ByteBuffer getRowKey(Map<String, ByteBuffer> keyColumns)
+ private ByteBuffer getPartitionKey(Map<String, ByteBuffer> keyColumns)
{
- //current row key
- ByteBuffer rowKey;
+ ByteBuffer partitionKey;
if (keyValidator instanceof CompositeType)
{
- ByteBuffer[] keys = new ByteBuffer[partitionkeys.length];
+ ByteBuffer[] keys = new ByteBuffer[partitionKeyColumns.length];
for (int i = 0; i< keys.length; i++)
- keys[i] = keyColumns.get(partitionkeys[i]);
+ keys[i] = keyColumns.get(partitionKeyColumns[i]);
- rowKey = ((CompositeType) keyValidator).build(keys);
+ partitionKey = ((CompositeType) keyValidator).build(keys);
}
else
{
- rowKey = keyColumns.get(partitionkeys[0]);
+ partitionKey = keyColumns.get(partitionKeyColumns[0]);
}
- return rowKey;
+ return partitionKey;
}
/** retrieve the key validator from system.schema_columnfamilies table */
@@ -319,7 +321,8 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
String keyspace = ConfigHelper.getOutputKeyspace(conf);
String cfName = ConfigHelper.getOutputColumnFamily(conf);
String query = "SELECT key_validator," +
- " key_aliases " +
+ " key_aliases," +
+ " column_aliases " +
"FROM system.schema_columnfamilies " +
"WHERE keyspace_name='%s' and columnfamily_name='%s'";
String formatted = String.format(query, keyspace, cfName);
@@ -334,16 +337,22 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
logger.debug("partition keys: " + keyString);
List<String> keys = FBUtilities.fromJsonList(keyString);
- partitionkeys = new String[keys.size()];
+ partitionKeyColumns = new String[keys.size()];
int i = 0;
for (String key : keys)
{
- partitionkeys[i] = key;
+ partitionKeyColumns[i] = key;
i++;
}
+
+ Column rawClusterColumns = result.rows.get(0).columns.get(2);
+ String clusterColumnString = ByteBufferUtil.string(ByteBuffer.wrap(rawClusterColumns.getValue()));
+
+ logger.debug("cluster columns: " + clusterColumnString);
+ clusterColumns = FBUtilities.fromJsonList(clusterColumnString);
}
- private AbstractType<?> parseType(String type) throws IOException
+ private AbstractType<?> parseType(String type) throws ConfigurationException
{
try
{
@@ -352,32 +361,24 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
return LongType.instance;
return TypeParser.parse(type);
}
- catch (ConfigurationException e)
- {
- throw new IOException(e);
- }
catch (SyntaxException e)
{
- throw new IOException(e);
+ throw new ConfigurationException(e.getMessage(), e);
}
}
-
- private String getAnyHost() throws IOException, InvalidRequestException, TException
+
+ /**
+ * add where clauses for partition keys and cluster columns
+ */
+ private String appendKeyWhereClauses(String cqlQuery)
{
- Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf);
- List<TokenRange> ring = client.describe_ring(ConfigHelper.getOutputKeyspace(conf));
- try
- {
- for (TokenRange range : ring)
- return range.endpoints.get(0);
- }
- finally
- {
- TTransport transport = client.getOutputProtocol().getTransport();
- if (transport.isOpen())
- transport.close();
- }
- throw new IOException("There are no endpoints");
- }
+ String keyWhereClause = "";
+
+ for (String partitionKey : partitionKeyColumns)
+ keyWhereClause += String.format("%s = ?", keyWhereClause.isEmpty() ? partitionKey : (" AND " + partitionKey));
+ for (String clusterColumn : clusterColumns)
+ keyWhereClause += " AND " + clusterColumn + " = ?";
+ return cqlQuery + " WHERE " + keyWhereClause;
+ }
}