You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/02/29 21:43:11 UTC

[12/12] git commit: Pig: support for composite columns Patch by Janne Jalkanen, reviewed by brandonwilliams for CASSANDRA-3684

Pig: support for composite columns
Patch by Janne Jalkanen, reviewed by brandonwilliams for CASSANDRA-3684


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b61f1d43
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b61f1d43
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b61f1d43

Branch: refs/heads/cassandra-1.1
Commit: b61f1d43e74134b1c4fc27b68d27b59f9d3739d5
Parents: ebafaeb
Author: Brandon Williams <br...@apache.org>
Authored: Wed Feb 29 14:33:20 2012 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Feb 29 14:33:20 2012 -0600

----------------------------------------------------------------------
 .../cassandra/hadoop/pig/CassandraStorage.java     |   43 ++++++++++++---
 contrib/pig/test/populate-cli.txt                  |   20 +++++++
 contrib/pig/test/test_storage.pig                  |   21 +++++++
 .../db/marshal/AbstractCompositeType.java          |   32 +++++++++++
 4 files changed, 109 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b61f1d43/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 6d1f76a..9c2dded 100644
--- a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -5,9 +5,9 @@
  * licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.IntegerType;
 import org.apache.cassandra.db.marshal.TypeParser;
@@ -35,6 +36,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent;
 import org.apache.cassandra.hadoop.*;
 import org.apache.cassandra.thrift.Mutation;
 import org.apache.cassandra.thrift.Deletion;
@@ -100,7 +102,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         this.limit = limit;
     }
 
-    public int getLimit() 
+    public int getLimit()
     {
         return limit;
     }
@@ -155,13 +157,37 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         }
     }
 
+    /**
+     *  Deconstructs a composite type to a Tuple.
+     */
+    private Tuple composeComposite( AbstractCompositeType comparator, ByteBuffer name ) throws IOException
+    {
+        List<CompositeComponent> result = comparator.deconstruct( name );
+
+        Tuple t = TupleFactory.getInstance().newTuple( result.size() );
+
+        for( int i = 0; i < result.size(); i++ )
+        {
+            setTupleValue( t, i, result.get(i).comparator.compose( result.get(i).value ) );
+        }
+
+        return t;
+    }
+
     private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException
     {
         Tuple pair = TupleFactory.getInstance().newTuple(2);
         List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
 
-        setTupleValue(pair, 0, comparator.compose(col.name()));
+        if( comparator instanceof AbstractCompositeType )
+        {
+            setTupleValue(pair, 0, composeComposite((AbstractCompositeType)comparator,col.name()));
+        }
+        else
+        {
+            setTupleValue(pair, 0, comparator.compose(col.name()));
+        }
         if (col instanceof Column)
         {
             // standard
@@ -321,15 +347,15 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     {
         if (System.getenv(PIG_RPC_PORT) != null)
             ConfigHelper.setRpcPort(conf, System.getenv(PIG_RPC_PORT));
-        else if (ConfigHelper.getRpcPort(conf) == 0) 
+        else if (ConfigHelper.getRpcPort(conf) == 0)
             throw new IOException("PIG_RPC_PORT environment variable not set");
         if (System.getenv(PIG_INITIAL_ADDRESS) != null)
             ConfigHelper.setInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
-        else if (ConfigHelper.getInitialAddress(conf) == null) 
+        else if (ConfigHelper.getInitialAddress(conf) == null)
             throw new IOException("PIG_INITIAL_ADDRESS environment variable not set");
         if (System.getenv(PIG_PARTITIONER) != null)
             ConfigHelper.setPartitioner(conf, System.getenv(PIG_PARTITIONER));
-        else if (ConfigHelper.getPartitioner(conf) == null) 
+        else if (ConfigHelper.getPartitioner(conf) == null)
             throw new IOException("PIG_PARTITIONER environment variable not set");
         if (System.getenv(PIG_ALLOW_DELETES) != null)
             allow_deletes = Boolean.valueOf(System.getenv(PIG_ALLOW_DELETES));
@@ -449,6 +475,9 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
             return DataType.FLOAT;
         else if (type instanceof DoubleType)
             return DataType.DOUBLE;
+        else if (type instanceof AbstractCompositeType )
+            return DataType.TUPLE;
+
         return DataType.BYTEARRAY;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b61f1d43/contrib/pig/test/populate-cli.txt
----------------------------------------------------------------------
diff --git a/contrib/pig/test/populate-cli.txt b/contrib/pig/test/populate-cli.txt
index 0164afe..c8124dd 100644
--- a/contrib/pig/test/populate-cli.txt
+++ b/contrib/pig/test/populate-cli.txt
@@ -86,3 +86,23 @@ incr CC['chuck']['kick'];
 incr CC['chuck']['kick'];
 incr CC['chuck']['kick'];
 incr CC['chuck']['fist'];
+
+create column family Compo
+    with key_validation_class = UTF8Type
+    and default_validation_class = UTF8Type
+    and comparator = 'CompositeType(UTF8Type,UTF8Type)';
+
+set Compo['punch']['bruce:lee'] = 'ouch';
+set Compo['punch']['bruce:bruce'] = 'hunh?';
+set Compo['kick']['bruce:lee'] = 'oww';
+set Compo['kick']['bruce:bruce'] = 'watch it, mate';
+
+create column family CompoInt
+    with key_validation_class = UTF8Type
+    and default_validation_class = UTF8Type
+    and comparator = 'CompositeType(LongType,LongType)';
+
+set CompoInt['clock']['1:0'] = 'z';
+set CompoInt['clock']['1:30'] = 'zzzz';
+set CompoInt['clock']['2:30'] = 'daddy?';
+set CompoInt['clock']['6:30'] = 'coffee...';

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b61f1d43/contrib/pig/test/test_storage.pig
----------------------------------------------------------------------
diff --git a/contrib/pig/test/test_storage.pig b/contrib/pig/test/test_storage.pig
index c49d4b3..a0157f7 100644
--- a/contrib/pig/test/test_storage.pig
+++ b/contrib/pig/test/test_storage.pig
@@ -47,3 +47,24 @@ CC = load 'cassandra://PigTest/CC' using CassandraStorage();
 total_hits = foreach CC generate key, SUM(columns.value);
 
 dump total_hits;
+
+--
+--  Test CompositeType
+--
+
+compo = load 'cassandra://PigTest/Compo' using CassandraStorage();
+
+compo = foreach compo generate key as method, flatten(columns);
+
+lee = filter compo by columns::name == ('bruce','lee');
+
+dump lee;
+
+night = load 'cassandra://PigTest/CompoInt' using CassandraStorage();
+night = foreach night generate flatten(columns);
+night = foreach night generate (int)columns::name.$0+(double)columns::name.$1/60 as hour, columns::value as noise;
+
+-- What happens at the darkest hour?
+darkest = filter night by hour > 2 and hour < 5;
+
+dump darkest;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b61f1d43/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
index 1ecc72e..e07807c 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
@@ -141,6 +141,38 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
         return sb.toString();
     }
 
+    public static class CompositeComponent
+    {
+        public AbstractType comparator;
+        public ByteBuffer   value;
+
+        public CompositeComponent( AbstractType comparator, ByteBuffer value )
+        {
+            this.comparator = comparator;
+            this.value      = value;
+        }
+    }
+
+    public List<CompositeComponent> deconstruct( ByteBuffer bytes )
+    {
+        List<CompositeComponent> list = new ArrayList<CompositeComponent>();
+
+        ByteBuffer bb = bytes.duplicate();
+        int i = 0;
+
+        while (bb.remaining() > 0)
+        {
+            AbstractType comparator = getNextComparator(i, bb);
+            ByteBuffer value = getWithShortLength(bb);
+
+            list.add( new CompositeComponent(comparator,value) );
+
+            byte b = bb.get(); // Ignore; not relevant here
+            ++i;
+        }
+        return list;
+    }
+
     /*
      * FIXME: this would break if some of the component string representation
      * contains ':'. None of our current comparator do so, so this is probably