You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/06/26 10:51:42 UTC
[3/4] git commit: Support Thrift tables clustering columns on
CqlPagingInputFormat
Support Thrift tables clustering columns on CqlPagingInputFormat
patch by pauloricardomg; reviewed by alexliu68 for CASSANDRA-7445
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c9cef44a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c9cef44a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c9cef44a
Branch: refs/heads/cassandra-2.0
Commit: c9cef44a1a6c10036200f410c8a26942e64c8f12
Parents: 87c4efe
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jun 26 10:40:22 2014 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jun 26 10:40:22 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../hadoop/cql3/CqlPagingRecordReader.java | 16 ++---
.../cassandra/pig/ThriftColumnFamilyTest.java | 61 +++++++++++++++++---
3 files changed, 65 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9cef44a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6adef97..9fbcd9a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,6 @@
+1.2.18
+ * Support Thrift tables clustering columns on CqlPagingInputFormat (CASSANDRA-7445)
+
1.2.17
* cqlsh: Fix CompositeType columns in DESCRIBE TABLE output (CASSANDRA-7399)
* Expose global ColmunFamily metrics (CASSANDRA-7273)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9cef44a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
index b6e793c..0542f7e 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
@@ -261,7 +261,7 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>,
{
value.clear();
value.putAll(getCurrentValue());
-
+
keys.clear();
keys.putAll(getCurrentKey());
@@ -703,7 +703,7 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>,
clusterColumns.add(new BoundColumn(key));
parseKeyValidators(ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue())));
-
+
Column rawComparator = cqlRow.columns.get(3);
String comparator = ByteBufferUtil.string(ByteBuffer.wrap(rawComparator.getValue()));
logger.debug("comparator: {}", comparator);
@@ -719,8 +719,8 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>,
}
}
- /**
- * retrieve the fake partition keys and cluster keys for classic thrift table
+ /**
+ * retrieve the fake partition keys and cluster keys for classic thrift table
* use CFDefinition to get keys and columns
* */
private void retrieveKeysForThriftTables() throws Exception
@@ -732,8 +732,10 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>,
{
CFMetaData cfMeta = CFMetaData.fromThrift(cfDef);
CFDefinition cfDefinition = new CFDefinition(cfMeta);
- for (ColumnIdentifier columnIdentifier : cfDefinition.keys.keySet())
- partitionBoundColumns.add(new BoundColumn(columnIdentifier.toString()));
+ for (ColumnIdentifier key : cfDefinition.keys.keySet())
+ partitionBoundColumns.add(new BoundColumn(key.toString()));
+ for (ColumnIdentifier column : cfDefinition.columns.keySet())
+ clusterColumns.add(new BoundColumn(column.toString()));
parseKeyValidators(cfDef.key_validation_class);
return;
}
@@ -814,7 +816,7 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>,
this.name = name;
}
}
-
+
/** get string from a ByteBuffer, catch the exception and throw it as runtime exception*/
private static String stringValue(ByteBuffer value)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9cef44a/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyTest.java b/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
index 223cbf4..6f6aa0b 100644
--- a/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
+++ b/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
@@ -47,7 +47,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
public class ThriftColumnFamilyTest extends PigTestBase
-{
+{
private static String[] statements = {
"create keyspace thriftKs with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and" +
" strategy_options={replication_factor:1};",
@@ -125,7 +125,7 @@ public class ThriftColumnFamilyTest extends PigTestBase
"create column family U8 with " +
"key_validation_class = UTF8Type and " +
"comparator = UTF8Type;",
-
+
"create column family Bytes with " +
"key_validation_class = BytesType and " +
"comparator = UTF8Type;",
@@ -181,7 +181,22 @@ public class ThriftColumnFamilyTest extends PigTestBase
"create column family CompoKeyCopy " +
"with key_validation_class = 'CompositeType(UTF8Type,LongType)' " +
"and default_validation_class = UTF8Type " +
- "and comparator = LongType;"
+ "and comparator = LongType;",
+
+
+ "create column family WideCf " +
+ " with comparator = UTF8Type " +
+ " and default_validation_class = UTF8Type " +
+ " and key_validation_class = UTF8Type " +
+ " and comparator = UTF8Type;",
+
+ "set WideCf['2014-06-06']['1'] = 'event1';",
+ "set WideCf['2014-06-06']['2'] = 'event2';",
+
+ "set WideCf['2014-06-07']['3'] = 'event3';",
+ "set WideCf['2014-06-07']['4'] = 'event4';",
+ "set WideCf['2014-06-07']['5'] = 'event5';",
+ "set WideCf['2014-06-07']['6'] = 'event6';",
};
@BeforeClass
@@ -326,6 +341,36 @@ public class ThriftColumnFamilyTest extends PigTestBase
}
@Test
+ public void testCqlStorageWithThriftWideRowCf() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
+ {
+ //regular thrift wide row column family with page size set to 1 to cause CASSANDRA-7445
+ pig.registerQuery("rows = load 'cql://thriftKs/WideCf?" + defaultParameters + "&page_size=1' using CqlStorage();");
+
+ /**
+ "set WideCf['2014-06-06']['1'] = 'event1';",
+ "set WideCf['2014-06-06']['2'] = 'event2';",
+ ---------------------------------------------
+ "set WideCf['2014-06-07']['3'] = 'event3';",
+ "set WideCf['2014-06-07']['4'] = 'event4';",
+ "set WideCf['2014-06-07']['5'] = 'event5';",
+ "set WideCf['2014-06-07']['6'] = 'event6';",
+ */
+
+ Iterator<Tuple> it = pig.openIterator("rows");
+ for (Integer i = 1; i <= 6; i++) {
+ Assert.assertTrue(it.hasNext());
+ Tuple t = it.next();
+ if (i < 3) {
+ Assert.assertEquals(t.get(0).toString(), "2014-06-06");
+ } else {
+ Assert.assertEquals(t.get(0).toString(), "2014-06-07");
+ }
+ Assert.assertEquals(t.get(1).toString(), i.toString());
+ Assert.assertEquals(t.get(2).toString(), "event" + i);
+ }
+ }
+
+ @Test
public void testCassandraStorageSchema() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException
{
//results: (qux,(atomic_weight,0.660161815846869),(created,1335890877),(name,User Qux),(percent,64.7),
@@ -707,7 +752,7 @@ public class ThriftColumnFamilyTest extends PigTestBase
Iterator<Tuple> it = pig.openIterator("compokeys");
if (it.hasNext()) {
Tuple t = it.next();
- Tuple key = (Tuple) t.get(0);
+ Tuple key = (Tuple) t.get(0);
Assert.assertEquals(key.get(0), "clock");
Assert.assertEquals(key.get(1), 40L);
DataBag columns = (DataBag) t.get(1);
@@ -735,7 +780,7 @@ public class ThriftColumnFamilyTest extends PigTestBase
count ++;
if (count == 1)
{
- Tuple key = (Tuple) t.get(0);
+ Tuple key = (Tuple) t.get(0);
Assert.assertEquals(key.get(0), "clock");
Assert.assertEquals(key.get(1), 10L);
DataBag columns = (DataBag) t.get(1);
@@ -749,7 +794,7 @@ public class ThriftColumnFamilyTest extends PigTestBase
}
else if (count == 2)
{
- Tuple key = (Tuple) t.get(0);
+ Tuple key = (Tuple) t.get(0);
Assert.assertEquals(key.get(0), "clock");
Assert.assertEquals(key.get(1), 20L);
DataBag columns = (DataBag) t.get(1);
@@ -763,7 +808,7 @@ public class ThriftColumnFamilyTest extends PigTestBase
}
else if (count == 3)
{
- Tuple key = (Tuple) t.get(0);
+ Tuple key = (Tuple) t.get(0);
Assert.assertEquals(key.get(0), "clock");
Assert.assertEquals(key.get(1), 30L);
DataBag columns = (DataBag) t.get(1);
@@ -777,7 +822,7 @@ public class ThriftColumnFamilyTest extends PigTestBase
}
else if (count == 4)
{
- Tuple key = (Tuple) t.get(0);
+ Tuple key = (Tuple) t.get(0);
Assert.assertEquals(key.get(0), "clock");
Assert.assertEquals(key.get(1), 40L);
DataBag columns = (DataBag) t.get(1);