You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/07/13 17:43:36 UTC

[3/3] git commit: Pig: support for composite row keys, writing composites Patch by Dirkjan Bussink, reviewed by brandonwilliams for CASSANDRA-4144

Pig: support for composite row keys, writing composites
Patch by Dirkjan Bussink, reviewed by brandonwilliams for CASSANDRA-4144


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7f6cc5ef
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7f6cc5ef
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7f6cc5ef

Branch: refs/heads/cassandra-1.1
Commit: 7f6cc5ef6cad831061dc5cb75f5623f949ab7ca2
Parents: 962b23b
Author: Brandon Williams <br...@apache.org>
Authored: Fri Jul 13 10:38:15 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Jul 13 10:38:15 2012 -0500

----------------------------------------------------------------------
 examples/pig/test/populate-cli.txt                 |   20 +++++++
 examples/pig/test/test_storage.pig                 |   17 +++++-
 .../cassandra/hadoop/pig/CassandraStorage.java     |   44 +++++++++++++-
 3 files changed, 76 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f6cc5ef/examples/pig/test/populate-cli.txt
----------------------------------------------------------------------
diff --git a/examples/pig/test/populate-cli.txt b/examples/pig/test/populate-cli.txt
index 7ec6cd6..1f59642 100644
--- a/examples/pig/test/populate-cli.txt
+++ b/examples/pig/test/populate-cli.txt
@@ -112,3 +112,23 @@ set CompoInt['clock']['1:0'] = 'z';
 set CompoInt['clock']['1:30'] = 'zzzz';
 set CompoInt['clock']['2:30'] = 'daddy?';
 set CompoInt['clock']['6:30'] = 'coffee...';
+
+create column family CompoIntCopy
+    with key_validation_class = UTF8Type
+    and default_validation_class = UTF8Type
+    and comparator = 'CompositeType(LongType,LongType)';
+
+create column family CompoKey
+    with key_validation_class = 'CompositeType(UTF8Type,LongType)'
+    and default_validation_class = UTF8Type
+    and comparator = LongType;
+
+set CompoKey['clock:10']['1'] = 'z';
+set CompoKey['clock:20']['1'] = 'zzzz';
+set CompoKey['clock:30']['2'] = 'daddy?';
+set CompoKey['clock:40']['6'] = 'coffee...';
+
+create column family CompoKeyCopy
+    with key_validation_class = 'CompositeType(UTF8Type,LongType)'
+    and default_validation_class = UTF8Type
+    and comparator = LongType;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f6cc5ef/examples/pig/test/test_storage.pig
----------------------------------------------------------------------
diff --git a/examples/pig/test/test_storage.pig b/examples/pig/test/test_storage.pig
index a0157f7..026cb02 100644
--- a/examples/pig/test/test_storage.pig
+++ b/examples/pig/test/test_storage.pig
@@ -67,4 +67,19 @@ night = foreach night generate (int)columns::name.$0+(double)columns::name.$1/60
 -- What happens at the darkest hour?
 darkest = filter night by hour > 2 and hour < 5;
 
-dump darkest;
\ No newline at end of file
+dump darkest;
+
+compo_int_rows = LOAD 'cassandra://PigTest/CompoInt' USING CassandraStorage();
+STORE compo_int_rows INTO 'cassandra://PigTest/CompoIntCopy' USING CassandraStorage();
+
+--
+--  Test CompositeKey
+--
+
+compokeys = load 'cassandra://PigTest/CompoKey' using CassandraStorage();
+compokeys = filter compokeys by key.$1 == 40;
+
+dump compokeys;
+
+compo_key_rows = LOAD 'cassandra://PigTest/CompoKey' USING CassandraStorage();
+STORE compo_key_rows INTO 'cassandra://PigTest/CompoKeyCopy' USING CassandraStorage();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f6cc5ef/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 5742cb9..454330c 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -131,7 +131,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     {
         return limit;
     }
-    
+
     public Tuple getNextWide() throws IOException
     {
         CfDef cfDef = getCfDef(loadSignature);
@@ -223,10 +223,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
 
             // output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest
             // NOTE: we're setting the tuple size here only for the key so we can use setTupleValue on it
-            Tuple tuple = TupleFactory.getInstance().newTuple(1);
+
+            Tuple tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
             DefaultDataBag bag = new DefaultDataBag();
-            // set the key
-            setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(2).compose(key));
+
             // we must add all the indexed columns first to match the schema
             Map<ByteBuffer, Boolean> added = new HashMap<ByteBuffer, Boolean>();
             // take care to iterate these in the same order as the schema does
@@ -283,6 +283,20 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         return t;
     }
 
+    private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
+    {
+        Tuple tuple = TupleFactory.getInstance().newTuple(1);
+        if( comparator instanceof AbstractCompositeType )
+        {
+            setTupleValue(tuple, 0, composeComposite((AbstractCompositeType)comparator,key));
+        }
+        else
+        {
+            setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(2).compose(key));
+        }
+        return tuple;
+    }
+
     private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException
     {
         Tuple pair = TupleFactory.getInstance().newTuple(2);
@@ -825,6 +839,28 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
             return DoubleType.instance.decompose((Double)o);
         if (o instanceof UUID)
             return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
+        if(o instanceof Tuple) {
+            List<Object> objects = ((Tuple)o).getAll();
+            List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+            int totalLength = 0;
+            for(Object sub : objects)
+            {
+                ByteBuffer buffer = objToBB(sub);
+                serialized.add(buffer);
+                totalLength += 2 + buffer.remaining() + 1;
+            }
+            ByteBuffer out = ByteBuffer.allocate(totalLength);
+            for (ByteBuffer bb : serialized)
+            {
+                int length = bb.remaining();
+                out.put((byte) ((length >> 8) & 0xFF));
+                out.put((byte) (length & 0xFF));
+                out.put(bb);
+                out.put((byte) 0);
+            }
+            out.flip();
+            return out;
+        }
 
         return ByteBuffer.wrap(((DataByteArray) o).get());
     }