You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/08/01 01:16:43 UTC
[2/3] git commit: fix support for Thrift tables in
CqlPagingRecordReader patch by Alex Liu;
reviewed by jbellis for CASSANDRA-5752
fix support for Thrift tables in CqlPagingRecordReader
patch by Alex Liu; reviewed by jbellis for CASSANDRA-5752
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7a394210
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7a394210
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7a394210
Branch: refs/heads/trunk
Commit: 7a39421074d3d14bfc1a4fa1ab986b4fa614f324
Parents: ba274ad
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Jul 31 18:15:40 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Jul 31 18:16:23 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../hadoop/cql3/CqlPagingRecordReader.java | 41 ++++++++++++++++++--
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 33 ++++++++++++++++
3 files changed, 72 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a394210/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index da1ec20..377b5a1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@
* Allow compacting 2Is via nodetool (CASSANDRA-5670)
* Hex-encode non-String keys in OPP (CASSANDRA-5793)
* nodetool history logging (CASSANDRA-5823)
+ * (Hadoop) fix support for Thrift tables in CqlPagingRecordReader
+ (CASSANDRA-5752)
1.2.8
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a394210/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
index fc07131..db77c9e 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
@@ -29,6 +29,9 @@ import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.cql3.CFDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.LongType;
@@ -671,6 +674,11 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>,
for (String key : keys)
partitionBoundColumns.add(new BoundColumn(key));
+ if (partitionBoundColumns.size() == 0)
+ {
+ retrieveKeysForThriftTables();
+ return;
+ }
keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
logger.debug("cluster columns: {}", keyString);
@@ -679,10 +687,35 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>,
for (String key : keys)
clusterColumns.add(new BoundColumn(key));
- Column rawKeyValidator = cqlRow.columns.get(2);
- String validator = ByteBufferUtil.string(ByteBuffer.wrap(rawKeyValidator.getValue()));
- logger.debug("row key validator: {}", validator);
- keyValidator = parseType(validator);
+ parseKeyValidators(ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue())));
+ }
+
+ /**
+ * retrieve the fake partition keys and cluster keys for classic thrift table
+ * use CFDefinition to get keys and columns
+ * */
+ private void retrieveKeysForThriftTables() throws Exception
+ {
+ KsDef ksDef = client.describe_keyspace(keyspace);
+ for (CfDef cfDef : ksDef.cf_defs)
+ {
+ if (cfDef.name.equalsIgnoreCase(cfName))
+ {
+ CFMetaData cfMeta = CFMetaData.fromThrift(cfDef);
+ CFDefinition cfDefinition = new CFDefinition(cfMeta);
+ for (ColumnIdentifier columnIdentifier : cfDefinition.keys.keySet())
+ partitionBoundColumns.add(new BoundColumn(columnIdentifier.toString()));
+ parseKeyValidators(cfDef.key_validation_class);
+ return;
+ }
+ }
+ }
+
+ /** parse key validators */
+ private void parseKeyValidators(String rowKeyValidator) throws IOException
+ {
+ logger.debug("row key validator: {} ", rowKeyValidator);
+ keyValidator = parseType(rowKeyValidator);
if (keyValidator instanceof CompositeType)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a394210/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 612f86a..76d419e 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -26,6 +26,9 @@ import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.cql3.CFDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.LongType;
@@ -337,6 +340,11 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
logger.debug("partition keys: " + keyString);
List<String> keys = FBUtilities.fromJsonList(keyString);
+ if (keys.size() == 0)
+ {
+ retrieveKeysForThriftTables(client);
+ return;
+ }
partitionKeyColumns = new String[keys.size()];
int i = 0;
for (String key : keys)
@@ -352,6 +360,31 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
clusterColumns = FBUtilities.fromJsonList(clusterColumnString);
}
+ /**
+ * retrieve the fake partition keys and cluster keys for classic thrift table
+ * use CFDefinition to get keys and columns
+ * */
+ private void retrieveKeysForThriftTables(Cassandra.Client client) throws Exception
+ {
+ String keyspace = ConfigHelper.getOutputKeyspace(conf);
+ String cfName = ConfigHelper.getOutputColumnFamily(conf);
+ KsDef ksDef = client.describe_keyspace(keyspace);
+ for (CfDef cfDef : ksDef.cf_defs)
+ {
+ if (cfDef.name.equalsIgnoreCase(cfName))
+ {
+ CFMetaData cfMeta = CFMetaData.fromThrift(cfDef);
+ CFDefinition cfDefinition = new CFDefinition(cfMeta);
+ int i = 0;
+ for (ColumnIdentifier column : cfDefinition.keys.keySet())
+ {
+ partitionKeyColumns[i] = column.toString();
+ i++;
+ }
+ return;
+ }
+ }
+ }
private AbstractType<?> parseType(String type) throws ConfigurationException
{
try