You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2013/09/10 20:45:52 UTC
[3/6] git commit: Pig support for CQL collections. Patch by Alex Liu,
reviewed by brandonwilliams for CASSANDRA-5867
Pig support for CQL collections.
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5867
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/70297f9a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/70297f9a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/70297f9a
Branch: refs/heads/trunk
Commit: 70297f9ad44d52cc9612cd91e7305969fc86e204
Parents: caef32e
Author: Brandon Williams <br...@apache.org>
Authored: Tue Sep 10 13:28:23 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Sep 10 13:28:23 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../hadoop/pig/AbstractCassandraStorage.java | 78 +++++++++++++++-----
.../apache/cassandra/hadoop/pig/CqlStorage.java | 78 +++++++++++++++++++-
3 files changed, 135 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2328bf7..4d5b446 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
* Fix streaming does not transfer wrapped range (CASSANDRA-5948)
* Fix loading index summary containing empty key (CASSANDRA-5965)
* Correctly handle limits in CompositesSearcher (CASSANDRA-5975)
+ * Pig: handle CQL collections (CASSANDRA-5867)
1.2.9
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 59d7817..03805d2 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -330,7 +330,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
return DataType.FLOAT;
else if (type instanceof DoubleType)
return DataType.DOUBLE;
- else if (type instanceof AbstractCompositeType )
+ else if (type instanceof AbstractCompositeType || type instanceof CollectionType)
return DataType.TUPLE;
return DataType.BYTEARRAY;
@@ -401,30 +401,72 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
if(o instanceof Tuple) {
List<Object> objects = ((Tuple)o).getAll();
- List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
- int totalLength = 0;
- for(Object sub : objects)
+ //collections
+ if (objects.size() > 0 && objects.get(0) instanceof String)
{
- ByteBuffer buffer = objToBB(sub);
- serialized.add(buffer);
- totalLength += 2 + buffer.remaining() + 1;
- }
- ByteBuffer out = ByteBuffer.allocate(totalLength);
- for (ByteBuffer bb : serialized)
- {
- int length = bb.remaining();
- out.put((byte) ((length >> 8) & 0xFF));
- out.put((byte) (length & 0xFF));
- out.put(bb);
- out.put((byte) 0);
+ String collectionType = (String) objects.get(0);
+ if ("set".equalsIgnoreCase(collectionType) ||
+ "list".equalsIgnoreCase(collectionType))
+ return objToListOrSetBB(objects.subList(1, objects.size()));
+ else if ("map".equalsIgnoreCase(collectionType))
+ return objToMapBB(objects.subList(1, objects.size()));
+
}
- out.flip();
- return out;
+ return objToCompositeBB(objects);
}
return ByteBuffer.wrap(((DataByteArray) o).get());
}
+ private ByteBuffer objToListOrSetBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+ for(Object sub : objects)
+ {
+ ByteBuffer buffer = objToBB(sub);
+ serialized.add(buffer);
+ }
+ return CollectionType.pack(serialized, objects.size());
+ }
+
+ private ByteBuffer objToMapBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2);
+ for(Object sub : objects)
+ {
+ List<Object> keyValue = ((Tuple)sub).getAll();
+ for (Object entry: keyValue)
+ {
+ ByteBuffer buffer = objToBB(entry);
+ serialized.add(buffer);
+ }
+ }
+ return CollectionType.pack(serialized, objects.size());
+ }
+
+ private ByteBuffer objToCompositeBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+ int totalLength = 0;
+ for(Object sub : objects)
+ {
+ ByteBuffer buffer = objToBB(sub);
+ serialized.add(buffer);
+ totalLength += 2 + buffer.remaining() + 1;
+ }
+ ByteBuffer out = ByteBuffer.allocate(totalLength);
+ for (ByteBuffer bb : serialized)
+ {
+ int length = bb.remaining();
+ out.put((byte) ((length >> 8) & 0xFF));
+ out.put((byte) (length & 0xFF));
+ out.put(bb);
+ out.put((byte) 0);
+ }
+ out.flip();
+ return out;
+ }
+
public void cleanupOnFailure(String failure, Job job)
{
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 7e22823..a73e5a5 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -35,6 +35,7 @@ import org.apache.pig.Expression;
import org.apache.pig.Expression.OpType;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.*;
import org.apache.pig.impl.util.UDFContext;
@@ -107,10 +108,11 @@ public class CqlStorage extends AbstractCassandraStorage
if (columnValue != null)
{
IColumn column = new Column(cdef.name, columnValue);
- tuple.set(i, columnToTuple(column, cfDef, UTF8Type.instance));
+ AbstractType<?> validator = getValidatorMap(cfDef).get(column.name());
+ setTupleValue(tuple, i, cqlColumnToObj(column, cfDef), validator);
}
else
- tuple.set(i, TupleFactory.getInstance().newTuple());
+ tuple.set(i, null);
i++;
}
return tuple;
@@ -121,6 +123,74 @@ public class CqlStorage extends AbstractCassandraStorage
}
}
+ /** set the value to the position of the tuple */
+ private void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ if (validator instanceof CollectionType)
+ setCollectionTupleValues(tuple, position, value, validator);
+ else
+ setTupleValue(tuple, position, value);
+ }
+
+ /** set the values of set/list at and after the position of the tuple */
+ private void setCollectionTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ if (validator instanceof MapType)
+ {
+ setMapTupleValues(tuple, position, value, validator);
+ return;
+ }
+ AbstractType<?> elementValidator;
+ if (validator instanceof SetType)
+ elementValidator = ((SetType<?>) validator).elements;
+ else if (validator instanceof ListType)
+ elementValidator = ((ListType<?>) validator).elements;
+ else
+ return;
+
+ int i = 0;
+ Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
+ for (Object entry : (Collection<?>) value)
+ {
+ setTupleValue(innerTuple, i, entry, elementValidator);
+ i++;
+ }
+ tuple.set(position, innerTuple);
+ }
+
+ /** set the values of set/list at and after the position of the tuple */
+ private void setMapTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ AbstractType<?> keyValidator = ((MapType<?, ?>) validator).keys;
+ AbstractType<?> valueValidator = ((MapType<?, ?>) validator).values;
+
+ int i = 0;
+ Tuple innerTuple = TupleFactory.getInstance().newTuple(((Map<?,?>) value).size());
+ for(Map.Entry<?,?> entry : ((Map<Object, Object>)value).entrySet())
+ {
+ Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
+ setTupleValue(mapEntryTuple, 0, entry.getKey(), keyValidator);
+ setTupleValue(mapEntryTuple, 1, entry.getValue(), valueValidator);
+ innerTuple.set(i, mapEntryTuple);
+ i++;
+ }
+ tuple.set(position, innerTuple);
+ }
+
+ /** convert a cql column to an object */
+ private Object cqlColumnToObj(IColumn col, CfDef cfDef) throws IOException
+ {
+ // standard
+ Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+ if (validators.get(col.name()) == null)
+ {
+ Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+ return marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value());
+ }
+ else
+ return validators.get(col.name()).compose(col.value());
+ }
+
/** set read configuration settings */
public void setLocation(String location, Job job) throws IOException
{
@@ -410,7 +480,7 @@ public class CqlStorage extends AbstractCassandraStorage
// output prepared statement
if (urlQuery.containsKey("output_query"))
- outputQuery = urlQuery.get("output_query").replaceAll("#", "?").replaceAll("@", "=");
+ outputQuery = urlQuery.get("output_query");
// user defined where clause
if (urlQuery.containsKey("where_clause"))
@@ -457,7 +527,7 @@ public class CqlStorage extends AbstractCassandraStorage
String name = be.getLhs().toString();
String value = be.getRhs().toString();
OpType op = expression.getOpType();
- String opString = op.name();
+ String opString = op.toString();
switch (op)
{
case OP_EQ: