You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/02/29 21:43:11 UTC
[12/12] git commit: Pig: support for composite columns Patch by Janne
Jalkanen, reviewed by brandonwilliams for CASSANDRA-3684
Pig: support for composite columns
Patch by Janne Jalkanen, reviewed by brandonwilliams for CASSANDRA-3684
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b61f1d43
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b61f1d43
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b61f1d43
Branch: refs/heads/cassandra-1.1
Commit: b61f1d43e74134b1c4fc27b68d27b59f9d3739d5
Parents: ebafaeb
Author: Brandon Williams <br...@apache.org>
Authored: Wed Feb 29 14:33:20 2012 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Feb 29 14:33:20 2012 -0600
----------------------------------------------------------------------
.../cassandra/hadoop/pig/CassandraStorage.java | 43 ++++++++++++---
contrib/pig/test/populate-cli.txt | 20 +++++++
contrib/pig/test/test_storage.pig | 21 +++++++
.../db/marshal/AbstractCompositeType.java | 32 +++++++++++
4 files changed, 109 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b61f1d43/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 6d1f76a..9c2dded 100644
--- a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.IntegerType;
import org.apache.cassandra.db.marshal.TypeParser;
@@ -35,6 +36,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent;
import org.apache.cassandra.hadoop.*;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.Deletion;
@@ -100,7 +102,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
this.limit = limit;
}
- public int getLimit()
+ public int getLimit()
{
return limit;
}
@@ -155,13 +157,37 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
}
}
+ /**
+ * Deconstructs a composite type to a Tuple.
+ */
+ private Tuple composeComposite( AbstractCompositeType comparator, ByteBuffer name ) throws IOException
+ {
+ List<CompositeComponent> result = comparator.deconstruct( name );
+
+ Tuple t = TupleFactory.getInstance().newTuple( result.size() );
+
+ for( int i = 0; i < result.size(); i++ )
+ {
+ setTupleValue( t, i, result.get(i).comparator.compose( result.get(i).value ) );
+ }
+
+ return t;
+ }
+
private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException
{
Tuple pair = TupleFactory.getInstance().newTuple(2);
List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
- setTupleValue(pair, 0, comparator.compose(col.name()));
+ if( comparator instanceof AbstractCompositeType )
+ {
+ setTupleValue(pair, 0, composeComposite((AbstractCompositeType)comparator,col.name()));
+ }
+ else
+ {
+ setTupleValue(pair, 0, comparator.compose(col.name()));
+ }
if (col instanceof Column)
{
// standard
@@ -321,15 +347,15 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
{
if (System.getenv(PIG_RPC_PORT) != null)
ConfigHelper.setRpcPort(conf, System.getenv(PIG_RPC_PORT));
- else if (ConfigHelper.getRpcPort(conf) == 0)
+ else if (ConfigHelper.getRpcPort(conf) == 0)
throw new IOException("PIG_RPC_PORT environment variable not set");
if (System.getenv(PIG_INITIAL_ADDRESS) != null)
ConfigHelper.setInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
- else if (ConfigHelper.getInitialAddress(conf) == null)
+ else if (ConfigHelper.getInitialAddress(conf) == null)
throw new IOException("PIG_INITIAL_ADDRESS environment variable not set");
if (System.getenv(PIG_PARTITIONER) != null)
ConfigHelper.setPartitioner(conf, System.getenv(PIG_PARTITIONER));
- else if (ConfigHelper.getPartitioner(conf) == null)
+ else if (ConfigHelper.getPartitioner(conf) == null)
throw new IOException("PIG_PARTITIONER environment variable not set");
if (System.getenv(PIG_ALLOW_DELETES) != null)
allow_deletes = Boolean.valueOf(System.getenv(PIG_ALLOW_DELETES));
@@ -449,6 +475,9 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
return DataType.FLOAT;
else if (type instanceof DoubleType)
return DataType.DOUBLE;
+ else if (type instanceof AbstractCompositeType )
+ return DataType.TUPLE;
+
return DataType.BYTEARRAY;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b61f1d43/contrib/pig/test/populate-cli.txt
----------------------------------------------------------------------
diff --git a/contrib/pig/test/populate-cli.txt b/contrib/pig/test/populate-cli.txt
index 0164afe..c8124dd 100644
--- a/contrib/pig/test/populate-cli.txt
+++ b/contrib/pig/test/populate-cli.txt
@@ -86,3 +86,23 @@ incr CC['chuck']['kick'];
incr CC['chuck']['kick'];
incr CC['chuck']['kick'];
incr CC['chuck']['fist'];
+
+create column family Compo
+ with key_validation_class = UTF8Type
+ and default_validation_class = UTF8Type
+ and comparator = 'CompositeType(UTF8Type,UTF8Type)';
+
+set Compo['punch']['bruce:lee'] = 'ouch';
+set Compo['punch']['bruce:bruce'] = 'hunh?';
+set Compo['kick']['bruce:lee'] = 'oww';
+set Compo['kick']['bruce:bruce'] = 'watch it, mate';
+
+create column family CompoInt
+ with key_validation_class = UTF8Type
+ and default_validation_class = UTF8Type
+ and comparator = 'CompositeType(LongType,LongType)';
+
+set CompoInt['clock']['1:0'] = 'z';
+set CompoInt['clock']['1:30'] = 'zzzz';
+set CompoInt['clock']['2:30'] = 'daddy?';
+set CompoInt['clock']['6:30'] = 'coffee...';
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b61f1d43/contrib/pig/test/test_storage.pig
----------------------------------------------------------------------
diff --git a/contrib/pig/test/test_storage.pig b/contrib/pig/test/test_storage.pig
index c49d4b3..a0157f7 100644
--- a/contrib/pig/test/test_storage.pig
+++ b/contrib/pig/test/test_storage.pig
@@ -47,3 +47,24 @@ CC = load 'cassandra://PigTest/CC' using CassandraStorage();
total_hits = foreach CC generate key, SUM(columns.value);
dump total_hits;
+
+--
+-- Test CompositeType
+--
+
+compo = load 'cassandra://PigTest/Compo' using CassandraStorage();
+
+compo = foreach compo generate key as method, flatten(columns);
+
+lee = filter compo by columns::name == ('bruce','lee');
+
+dump lee;
+
+night = load 'cassandra://PigTest/CompoInt' using CassandraStorage();
+night = foreach night generate flatten(columns);
+night = foreach night generate (int)columns::name.$0+(double)columns::name.$1/60 as hour, columns::value as noise;
+
+-- What happens at the darkest hour?
+darkest = filter night by hour > 2 and hour < 5;
+
+dump darkest;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b61f1d43/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
index 1ecc72e..e07807c 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
@@ -141,6 +141,38 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
return sb.toString();
}
+ public static class CompositeComponent
+ {
+ public AbstractType comparator;
+ public ByteBuffer value;
+
+ public CompositeComponent( AbstractType comparator, ByteBuffer value )
+ {
+ this.comparator = comparator;
+ this.value = value;
+ }
+ }
+
+ public List<CompositeComponent> deconstruct( ByteBuffer bytes )
+ {
+ List<CompositeComponent> list = new ArrayList<CompositeComponent>();
+
+ ByteBuffer bb = bytes.duplicate();
+ int i = 0;
+
+ while (bb.remaining() > 0)
+ {
+ AbstractType comparator = getNextComparator(i, bb);
+ ByteBuffer value = getWithShortLength(bb);
+
+ list.add( new CompositeComponent(comparator,value) );
+
+ byte b = bb.get(); // Ignore; not relevant here
+ ++i;
+ }
+ return list;
+ }
+
/*
* FIXME: this would break if some of the component string representation
* contains ':'. None of our current comparator do so, so this is probably