You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/07/13 17:43:36 UTC
[3/3] git commit: Pig: support for composite row keys,
writing composites Patch by Dirkjan Bussink,
reviewed by brandonwilliams for CASSANDRA-4144
Pig: support for composite row keys, writing composites
Patch by Dirkjan Bussink, reviewed by brandonwilliams for CASSANDRA-4144
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7f6cc5ef
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7f6cc5ef
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7f6cc5ef
Branch: refs/heads/cassandra-1.1
Commit: 7f6cc5ef6cad831061dc5cb75f5623f949ab7ca2
Parents: 962b23b
Author: Brandon Williams <br...@apache.org>
Authored: Fri Jul 13 10:38:15 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Jul 13 10:38:15 2012 -0500
----------------------------------------------------------------------
examples/pig/test/populate-cli.txt | 20 +++++++
examples/pig/test/test_storage.pig | 17 +++++-
.../cassandra/hadoop/pig/CassandraStorage.java | 44 +++++++++++++-
3 files changed, 76 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f6cc5ef/examples/pig/test/populate-cli.txt
----------------------------------------------------------------------
diff --git a/examples/pig/test/populate-cli.txt b/examples/pig/test/populate-cli.txt
index 7ec6cd6..1f59642 100644
--- a/examples/pig/test/populate-cli.txt
+++ b/examples/pig/test/populate-cli.txt
@@ -112,3 +112,23 @@ set CompoInt['clock']['1:0'] = 'z';
set CompoInt['clock']['1:30'] = 'zzzz';
set CompoInt['clock']['2:30'] = 'daddy?';
set CompoInt['clock']['6:30'] = 'coffee...';
+
+create column family CompoIntCopy
+ with key_validation_class = UTF8Type
+ and default_validation_class = UTF8Type
+ and comparator = 'CompositeType(LongType,LongType)';
+
+create column family CompoKey
+ with key_validation_class = 'CompositeType(UTF8Type,LongType)'
+ and default_validation_class = UTF8Type
+ and comparator = LongType;
+
+set CompoKey['clock:10']['1'] = 'z';
+set CompoKey['clock:20']['1'] = 'zzzz';
+set CompoKey['clock:30']['2'] = 'daddy?';
+set CompoKey['clock:40']['6'] = 'coffee...';
+
+create column family CompoKeyCopy
+ with key_validation_class = 'CompositeType(UTF8Type,LongType)'
+ and default_validation_class = UTF8Type
+ and comparator = LongType;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f6cc5ef/examples/pig/test/test_storage.pig
----------------------------------------------------------------------
diff --git a/examples/pig/test/test_storage.pig b/examples/pig/test/test_storage.pig
index a0157f7..026cb02 100644
--- a/examples/pig/test/test_storage.pig
+++ b/examples/pig/test/test_storage.pig
@@ -67,4 +67,19 @@ night = foreach night generate (int)columns::name.$0+(double)columns::name.$1/60
-- What happens at the darkest hour?
darkest = filter night by hour > 2 and hour < 5;
-dump darkest;
\ No newline at end of file
+dump darkest;
+
+compo_int_rows = LOAD 'cassandra://PigTest/CompoInt' USING CassandraStorage();
+STORE compo_int_rows INTO 'cassandra://PigTest/CompoIntCopy' USING CassandraStorage();
+
+--
+-- Test CompositeKey
+--
+
+compokeys = load 'cassandra://PigTest/CompoKey' using CassandraStorage();
+compokeys = filter compokeys by key.$1 == 40;
+
+dump compokeys;
+
+compo_key_rows = LOAD 'cassandra://PigTest/CompoKey' USING CassandraStorage();
+STORE compo_key_rows INTO 'cassandra://PigTest/CompoKeyCopy' USING CassandraStorage();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f6cc5ef/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 5742cb9..454330c 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -131,7 +131,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
{
return limit;
}
-
+
public Tuple getNextWide() throws IOException
{
CfDef cfDef = getCfDef(loadSignature);
@@ -223,10 +223,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
// output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest
// NOTE: we're setting the tuple size here only for the key so we can use setTupleValue on it
- Tuple tuple = TupleFactory.getInstance().newTuple(1);
+
+ Tuple tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
DefaultDataBag bag = new DefaultDataBag();
- // set the key
- setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(2).compose(key));
+
// we must add all the indexed columns first to match the schema
Map<ByteBuffer, Boolean> added = new HashMap<ByteBuffer, Boolean>();
// take care to iterate these in the same order as the schema does
@@ -283,6 +283,20 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
return t;
}
+ private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
+ {
+ Tuple tuple = TupleFactory.getInstance().newTuple(1);
+ if( comparator instanceof AbstractCompositeType )
+ {
+ setTupleValue(tuple, 0, composeComposite((AbstractCompositeType)comparator,key));
+ }
+ else
+ {
+ setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(2).compose(key));
+ }
+ return tuple;
+ }
+
private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException
{
Tuple pair = TupleFactory.getInstance().newTuple(2);
@@ -825,6 +839,28 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
return DoubleType.instance.decompose((Double)o);
if (o instanceof UUID)
return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
+ if(o instanceof Tuple) {
+ List<Object> objects = ((Tuple)o).getAll();
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+ int totalLength = 0;
+ for(Object sub : objects)
+ {
+ ByteBuffer buffer = objToBB(sub);
+ serialized.add(buffer);
+ totalLength += 2 + buffer.remaining() + 1;
+ }
+ ByteBuffer out = ByteBuffer.allocate(totalLength);
+ for (ByteBuffer bb : serialized)
+ {
+ int length = bb.remaining();
+ out.put((byte) ((length >> 8) & 0xFF));
+ out.put((byte) (length & 0xFF));
+ out.put(bb);
+ out.put((byte) 0);
+ }
+ out.flip();
+ return out;
+ }
return ByteBuffer.wrap(((DataByteArray) o).get());
}