You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2013/09/10 20:45:50 UTC
[1/6] git commit: Pig support for CQL collections. Patch by Alex Liu,
reviewed by brandonwilliams for CASSANDRA-5867
Updated Branches:
refs/heads/cassandra-1.2 caef32e5d -> 70297f9ad
refs/heads/cassandra-2.0 1ff0d8e9f -> 678aa37af
refs/heads/trunk 382261218 -> 7246502e7
Pig support for CQL collections.
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5867
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/70297f9a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/70297f9a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/70297f9a
Branch: refs/heads/cassandra-1.2
Commit: 70297f9ad44d52cc9612cd91e7305969fc86e204
Parents: caef32e
Author: Brandon Williams <br...@apache.org>
Authored: Tue Sep 10 13:28:23 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Sep 10 13:28:23 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../hadoop/pig/AbstractCassandraStorage.java | 78 +++++++++++++++-----
.../apache/cassandra/hadoop/pig/CqlStorage.java | 78 +++++++++++++++++++-
3 files changed, 135 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2328bf7..4d5b446 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
* Fix streaming does not transfer wrapped range (CASSANDRA-5948)
* Fix loading index summary containing empty key (CASSANDRA-5965)
* Correctly handle limits in CompositesSearcher (CASSANDRA-5975)
+ * Pig: handle CQL collections (CASSANDRA-5867)
1.2.9
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 59d7817..03805d2 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -330,7 +330,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
return DataType.FLOAT;
else if (type instanceof DoubleType)
return DataType.DOUBLE;
- else if (type instanceof AbstractCompositeType )
+ else if (type instanceof AbstractCompositeType || type instanceof CollectionType)
return DataType.TUPLE;
return DataType.BYTEARRAY;
@@ -401,30 +401,72 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
if(o instanceof Tuple) {
List<Object> objects = ((Tuple)o).getAll();
- List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
- int totalLength = 0;
- for(Object sub : objects)
+ //collections
+ if (objects.size() > 0 && objects.get(0) instanceof String)
{
- ByteBuffer buffer = objToBB(sub);
- serialized.add(buffer);
- totalLength += 2 + buffer.remaining() + 1;
- }
- ByteBuffer out = ByteBuffer.allocate(totalLength);
- for (ByteBuffer bb : serialized)
- {
- int length = bb.remaining();
- out.put((byte) ((length >> 8) & 0xFF));
- out.put((byte) (length & 0xFF));
- out.put(bb);
- out.put((byte) 0);
+ String collectionType = (String) objects.get(0);
+ if ("set".equalsIgnoreCase(collectionType) ||
+ "list".equalsIgnoreCase(collectionType))
+ return objToListOrSetBB(objects.subList(1, objects.size()));
+ else if ("map".equalsIgnoreCase(collectionType))
+ return objToMapBB(objects.subList(1, objects.size()));
+
}
- out.flip();
- return out;
+ return objToCompositeBB(objects);
}
return ByteBuffer.wrap(((DataByteArray) o).get());
}
+ private ByteBuffer objToListOrSetBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+ for(Object sub : objects)
+ {
+ ByteBuffer buffer = objToBB(sub);
+ serialized.add(buffer);
+ }
+ return CollectionType.pack(serialized, objects.size());
+ }
+
+ private ByteBuffer objToMapBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2);
+ for(Object sub : objects)
+ {
+ List<Object> keyValue = ((Tuple)sub).getAll();
+ for (Object entry: keyValue)
+ {
+ ByteBuffer buffer = objToBB(entry);
+ serialized.add(buffer);
+ }
+ }
+ return CollectionType.pack(serialized, objects.size());
+ }
+
+ private ByteBuffer objToCompositeBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+ int totalLength = 0;
+ for(Object sub : objects)
+ {
+ ByteBuffer buffer = objToBB(sub);
+ serialized.add(buffer);
+ totalLength += 2 + buffer.remaining() + 1;
+ }
+ ByteBuffer out = ByteBuffer.allocate(totalLength);
+ for (ByteBuffer bb : serialized)
+ {
+ int length = bb.remaining();
+ out.put((byte) ((length >> 8) & 0xFF));
+ out.put((byte) (length & 0xFF));
+ out.put(bb);
+ out.put((byte) 0);
+ }
+ out.flip();
+ return out;
+ }
+
public void cleanupOnFailure(String failure, Job job)
{
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 7e22823..a73e5a5 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -35,6 +35,7 @@ import org.apache.pig.Expression;
import org.apache.pig.Expression.OpType;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.*;
import org.apache.pig.impl.util.UDFContext;
@@ -107,10 +108,11 @@ public class CqlStorage extends AbstractCassandraStorage
if (columnValue != null)
{
IColumn column = new Column(cdef.name, columnValue);
- tuple.set(i, columnToTuple(column, cfDef, UTF8Type.instance));
+ AbstractType<?> validator = getValidatorMap(cfDef).get(column.name());
+ setTupleValue(tuple, i, cqlColumnToObj(column, cfDef), validator);
}
else
- tuple.set(i, TupleFactory.getInstance().newTuple());
+ tuple.set(i, null);
i++;
}
return tuple;
@@ -121,6 +123,74 @@ public class CqlStorage extends AbstractCassandraStorage
}
}
+ /** set the value to the position of the tuple */
+ private void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ if (validator instanceof CollectionType)
+ setCollectionTupleValues(tuple, position, value, validator);
+ else
+ setTupleValue(tuple, position, value);
+ }
+
+ /** set the values of set/list at and after the position of the tuple */
+ private void setCollectionTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ if (validator instanceof MapType)
+ {
+ setMapTupleValues(tuple, position, value, validator);
+ return;
+ }
+ AbstractType<?> elementValidator;
+ if (validator instanceof SetType)
+ elementValidator = ((SetType<?>) validator).elements;
+ else if (validator instanceof ListType)
+ elementValidator = ((ListType<?>) validator).elements;
+ else
+ return;
+
+ int i = 0;
+ Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
+ for (Object entry : (Collection<?>) value)
+ {
+ setTupleValue(innerTuple, i, entry, elementValidator);
+ i++;
+ }
+ tuple.set(position, innerTuple);
+ }
+
+ /** set the values of set/list at and after the position of the tuple */
+ private void setMapTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ AbstractType<?> keyValidator = ((MapType<?, ?>) validator).keys;
+ AbstractType<?> valueValidator = ((MapType<?, ?>) validator).values;
+
+ int i = 0;
+ Tuple innerTuple = TupleFactory.getInstance().newTuple(((Map<?,?>) value).size());
+ for(Map.Entry<?,?> entry : ((Map<Object, Object>)value).entrySet())
+ {
+ Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
+ setTupleValue(mapEntryTuple, 0, entry.getKey(), keyValidator);
+ setTupleValue(mapEntryTuple, 1, entry.getValue(), valueValidator);
+ innerTuple.set(i, mapEntryTuple);
+ i++;
+ }
+ tuple.set(position, innerTuple);
+ }
+
+ /** convert a cql column to an object */
+ private Object cqlColumnToObj(IColumn col, CfDef cfDef) throws IOException
+ {
+ // standard
+ Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+ if (validators.get(col.name()) == null)
+ {
+ Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+ return marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value());
+ }
+ else
+ return validators.get(col.name()).compose(col.value());
+ }
+
/** set read configuration settings */
public void setLocation(String location, Job job) throws IOException
{
@@ -410,7 +480,7 @@ public class CqlStorage extends AbstractCassandraStorage
// output prepared statement
if (urlQuery.containsKey("output_query"))
- outputQuery = urlQuery.get("output_query").replaceAll("#", "?").replaceAll("@", "=");
+ outputQuery = urlQuery.get("output_query");
// user defined where clause
if (urlQuery.containsKey("where_clause"))
@@ -457,7 +527,7 @@ public class CqlStorage extends AbstractCassandraStorage
String name = be.getLhs().toString();
String value = be.getRhs().toString();
OpType op = expression.getOpType();
- String opString = op.name();
+ String opString = op.toString();
switch (op)
{
case OP_EQ:
[3/6] git commit: Pig support for CQL collections. Patch by Alex Liu,
reviewed by brandonwilliams for CASSANDRA-5867
Posted by br...@apache.org.
Pig support for CQL collections.
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5867
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/70297f9a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/70297f9a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/70297f9a
Branch: refs/heads/trunk
Commit: 70297f9ad44d52cc9612cd91e7305969fc86e204
Parents: caef32e
Author: Brandon Williams <br...@apache.org>
Authored: Tue Sep 10 13:28:23 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Sep 10 13:28:23 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../hadoop/pig/AbstractCassandraStorage.java | 78 +++++++++++++++-----
.../apache/cassandra/hadoop/pig/CqlStorage.java | 78 +++++++++++++++++++-
3 files changed, 135 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2328bf7..4d5b446 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
* Fix streaming does not transfer wrapped range (CASSANDRA-5948)
* Fix loading index summary containing empty key (CASSANDRA-5965)
* Correctly handle limits in CompositesSearcher (CASSANDRA-5975)
+ * Pig: handle CQL collections (CASSANDRA-5867)
1.2.9
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 59d7817..03805d2 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -330,7 +330,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
return DataType.FLOAT;
else if (type instanceof DoubleType)
return DataType.DOUBLE;
- else if (type instanceof AbstractCompositeType )
+ else if (type instanceof AbstractCompositeType || type instanceof CollectionType)
return DataType.TUPLE;
return DataType.BYTEARRAY;
@@ -401,30 +401,72 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
if(o instanceof Tuple) {
List<Object> objects = ((Tuple)o).getAll();
- List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
- int totalLength = 0;
- for(Object sub : objects)
+ //collections
+ if (objects.size() > 0 && objects.get(0) instanceof String)
{
- ByteBuffer buffer = objToBB(sub);
- serialized.add(buffer);
- totalLength += 2 + buffer.remaining() + 1;
- }
- ByteBuffer out = ByteBuffer.allocate(totalLength);
- for (ByteBuffer bb : serialized)
- {
- int length = bb.remaining();
- out.put((byte) ((length >> 8) & 0xFF));
- out.put((byte) (length & 0xFF));
- out.put(bb);
- out.put((byte) 0);
+ String collectionType = (String) objects.get(0);
+ if ("set".equalsIgnoreCase(collectionType) ||
+ "list".equalsIgnoreCase(collectionType))
+ return objToListOrSetBB(objects.subList(1, objects.size()));
+ else if ("map".equalsIgnoreCase(collectionType))
+ return objToMapBB(objects.subList(1, objects.size()));
+
}
- out.flip();
- return out;
+ return objToCompositeBB(objects);
}
return ByteBuffer.wrap(((DataByteArray) o).get());
}
+ private ByteBuffer objToListOrSetBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+ for(Object sub : objects)
+ {
+ ByteBuffer buffer = objToBB(sub);
+ serialized.add(buffer);
+ }
+ return CollectionType.pack(serialized, objects.size());
+ }
+
+ private ByteBuffer objToMapBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2);
+ for(Object sub : objects)
+ {
+ List<Object> keyValue = ((Tuple)sub).getAll();
+ for (Object entry: keyValue)
+ {
+ ByteBuffer buffer = objToBB(entry);
+ serialized.add(buffer);
+ }
+ }
+ return CollectionType.pack(serialized, objects.size());
+ }
+
+ private ByteBuffer objToCompositeBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+ int totalLength = 0;
+ for(Object sub : objects)
+ {
+ ByteBuffer buffer = objToBB(sub);
+ serialized.add(buffer);
+ totalLength += 2 + buffer.remaining() + 1;
+ }
+ ByteBuffer out = ByteBuffer.allocate(totalLength);
+ for (ByteBuffer bb : serialized)
+ {
+ int length = bb.remaining();
+ out.put((byte) ((length >> 8) & 0xFF));
+ out.put((byte) (length & 0xFF));
+ out.put(bb);
+ out.put((byte) 0);
+ }
+ out.flip();
+ return out;
+ }
+
public void cleanupOnFailure(String failure, Job job)
{
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 7e22823..a73e5a5 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -35,6 +35,7 @@ import org.apache.pig.Expression;
import org.apache.pig.Expression.OpType;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.*;
import org.apache.pig.impl.util.UDFContext;
@@ -107,10 +108,11 @@ public class CqlStorage extends AbstractCassandraStorage
if (columnValue != null)
{
IColumn column = new Column(cdef.name, columnValue);
- tuple.set(i, columnToTuple(column, cfDef, UTF8Type.instance));
+ AbstractType<?> validator = getValidatorMap(cfDef).get(column.name());
+ setTupleValue(tuple, i, cqlColumnToObj(column, cfDef), validator);
}
else
- tuple.set(i, TupleFactory.getInstance().newTuple());
+ tuple.set(i, null);
i++;
}
return tuple;
@@ -121,6 +123,74 @@ public class CqlStorage extends AbstractCassandraStorage
}
}
+ /** set the value to the position of the tuple */
+ private void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ if (validator instanceof CollectionType)
+ setCollectionTupleValues(tuple, position, value, validator);
+ else
+ setTupleValue(tuple, position, value);
+ }
+
+ /** set the values of set/list at and after the position of the tuple */
+ private void setCollectionTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ if (validator instanceof MapType)
+ {
+ setMapTupleValues(tuple, position, value, validator);
+ return;
+ }
+ AbstractType<?> elementValidator;
+ if (validator instanceof SetType)
+ elementValidator = ((SetType<?>) validator).elements;
+ else if (validator instanceof ListType)
+ elementValidator = ((ListType<?>) validator).elements;
+ else
+ return;
+
+ int i = 0;
+ Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
+ for (Object entry : (Collection<?>) value)
+ {
+ setTupleValue(innerTuple, i, entry, elementValidator);
+ i++;
+ }
+ tuple.set(position, innerTuple);
+ }
+
+ /** set the values of set/list at and after the position of the tuple */
+ private void setMapTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ AbstractType<?> keyValidator = ((MapType<?, ?>) validator).keys;
+ AbstractType<?> valueValidator = ((MapType<?, ?>) validator).values;
+
+ int i = 0;
+ Tuple innerTuple = TupleFactory.getInstance().newTuple(((Map<?,?>) value).size());
+ for(Map.Entry<?,?> entry : ((Map<Object, Object>)value).entrySet())
+ {
+ Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
+ setTupleValue(mapEntryTuple, 0, entry.getKey(), keyValidator);
+ setTupleValue(mapEntryTuple, 1, entry.getValue(), valueValidator);
+ innerTuple.set(i, mapEntryTuple);
+ i++;
+ }
+ tuple.set(position, innerTuple);
+ }
+
+ /** convert a cql column to an object */
+ private Object cqlColumnToObj(IColumn col, CfDef cfDef) throws IOException
+ {
+ // standard
+ Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+ if (validators.get(col.name()) == null)
+ {
+ Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+ return marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value());
+ }
+ else
+ return validators.get(col.name()).compose(col.value());
+ }
+
/** set read configuration settings */
public void setLocation(String location, Job job) throws IOException
{
@@ -410,7 +480,7 @@ public class CqlStorage extends AbstractCassandraStorage
// output prepared statement
if (urlQuery.containsKey("output_query"))
- outputQuery = urlQuery.get("output_query").replaceAll("#", "?").replaceAll("@", "=");
+ outputQuery = urlQuery.get("output_query");
// user defined where clause
if (urlQuery.containsKey("where_clause"))
@@ -457,7 +527,7 @@ public class CqlStorage extends AbstractCassandraStorage
String name = be.getLhs().toString();
String value = be.getRhs().toString();
OpType op = expression.getOpType();
- String opString = op.name();
+ String opString = op.toString();
switch (op)
{
case OP_EQ:
[2/6] git commit: Pig support for CQL collections. Patch by Alex Liu,
reviewed by brandonwilliams for CASSANDRA-5867
Posted by br...@apache.org.
Pig support for CQL collections.
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5867
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/70297f9a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/70297f9a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/70297f9a
Branch: refs/heads/cassandra-2.0
Commit: 70297f9ad44d52cc9612cd91e7305969fc86e204
Parents: caef32e
Author: Brandon Williams <br...@apache.org>
Authored: Tue Sep 10 13:28:23 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Sep 10 13:28:23 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../hadoop/pig/AbstractCassandraStorage.java | 78 +++++++++++++++-----
.../apache/cassandra/hadoop/pig/CqlStorage.java | 78 +++++++++++++++++++-
3 files changed, 135 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2328bf7..4d5b446 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
* Fix streaming does not transfer wrapped range (CASSANDRA-5948)
* Fix loading index summary containing empty key (CASSANDRA-5965)
* Correctly handle limits in CompositesSearcher (CASSANDRA-5975)
+ * Pig: handle CQL collections (CASSANDRA-5867)
1.2.9
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 59d7817..03805d2 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -330,7 +330,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
return DataType.FLOAT;
else if (type instanceof DoubleType)
return DataType.DOUBLE;
- else if (type instanceof AbstractCompositeType )
+ else if (type instanceof AbstractCompositeType || type instanceof CollectionType)
return DataType.TUPLE;
return DataType.BYTEARRAY;
@@ -401,30 +401,72 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
if(o instanceof Tuple) {
List<Object> objects = ((Tuple)o).getAll();
- List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
- int totalLength = 0;
- for(Object sub : objects)
+ //collections
+ if (objects.size() > 0 && objects.get(0) instanceof String)
{
- ByteBuffer buffer = objToBB(sub);
- serialized.add(buffer);
- totalLength += 2 + buffer.remaining() + 1;
- }
- ByteBuffer out = ByteBuffer.allocate(totalLength);
- for (ByteBuffer bb : serialized)
- {
- int length = bb.remaining();
- out.put((byte) ((length >> 8) & 0xFF));
- out.put((byte) (length & 0xFF));
- out.put(bb);
- out.put((byte) 0);
+ String collectionType = (String) objects.get(0);
+ if ("set".equalsIgnoreCase(collectionType) ||
+ "list".equalsIgnoreCase(collectionType))
+ return objToListOrSetBB(objects.subList(1, objects.size()));
+ else if ("map".equalsIgnoreCase(collectionType))
+ return objToMapBB(objects.subList(1, objects.size()));
+
}
- out.flip();
- return out;
+ return objToCompositeBB(objects);
}
return ByteBuffer.wrap(((DataByteArray) o).get());
}
+ private ByteBuffer objToListOrSetBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+ for(Object sub : objects)
+ {
+ ByteBuffer buffer = objToBB(sub);
+ serialized.add(buffer);
+ }
+ return CollectionType.pack(serialized, objects.size());
+ }
+
+ private ByteBuffer objToMapBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2);
+ for(Object sub : objects)
+ {
+ List<Object> keyValue = ((Tuple)sub).getAll();
+ for (Object entry: keyValue)
+ {
+ ByteBuffer buffer = objToBB(entry);
+ serialized.add(buffer);
+ }
+ }
+ return CollectionType.pack(serialized, objects.size());
+ }
+
+ private ByteBuffer objToCompositeBB(List<Object> objects)
+ {
+ List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+ int totalLength = 0;
+ for(Object sub : objects)
+ {
+ ByteBuffer buffer = objToBB(sub);
+ serialized.add(buffer);
+ totalLength += 2 + buffer.remaining() + 1;
+ }
+ ByteBuffer out = ByteBuffer.allocate(totalLength);
+ for (ByteBuffer bb : serialized)
+ {
+ int length = bb.remaining();
+ out.put((byte) ((length >> 8) & 0xFF));
+ out.put((byte) (length & 0xFF));
+ out.put(bb);
+ out.put((byte) 0);
+ }
+ out.flip();
+ return out;
+ }
+
public void cleanupOnFailure(String failure, Job job)
{
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 7e22823..a73e5a5 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -35,6 +35,7 @@ import org.apache.pig.Expression;
import org.apache.pig.Expression.OpType;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.*;
import org.apache.pig.impl.util.UDFContext;
@@ -107,10 +108,11 @@ public class CqlStorage extends AbstractCassandraStorage
if (columnValue != null)
{
IColumn column = new Column(cdef.name, columnValue);
- tuple.set(i, columnToTuple(column, cfDef, UTF8Type.instance));
+ AbstractType<?> validator = getValidatorMap(cfDef).get(column.name());
+ setTupleValue(tuple, i, cqlColumnToObj(column, cfDef), validator);
}
else
- tuple.set(i, TupleFactory.getInstance().newTuple());
+ tuple.set(i, null);
i++;
}
return tuple;
@@ -121,6 +123,74 @@ public class CqlStorage extends AbstractCassandraStorage
}
}
+ /** set the value to the position of the tuple */
+ private void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ if (validator instanceof CollectionType)
+ setCollectionTupleValues(tuple, position, value, validator);
+ else
+ setTupleValue(tuple, position, value);
+ }
+
+ /** set the values of set/list at and after the position of the tuple */
+ private void setCollectionTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ if (validator instanceof MapType)
+ {
+ setMapTupleValues(tuple, position, value, validator);
+ return;
+ }
+ AbstractType<?> elementValidator;
+ if (validator instanceof SetType)
+ elementValidator = ((SetType<?>) validator).elements;
+ else if (validator instanceof ListType)
+ elementValidator = ((ListType<?>) validator).elements;
+ else
+ return;
+
+ int i = 0;
+ Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
+ for (Object entry : (Collection<?>) value)
+ {
+ setTupleValue(innerTuple, i, entry, elementValidator);
+ i++;
+ }
+ tuple.set(position, innerTuple);
+ }
+
+ /** set the values of set/list at and after the position of the tuple */
+ private void setMapTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ AbstractType<?> keyValidator = ((MapType<?, ?>) validator).keys;
+ AbstractType<?> valueValidator = ((MapType<?, ?>) validator).values;
+
+ int i = 0;
+ Tuple innerTuple = TupleFactory.getInstance().newTuple(((Map<?,?>) value).size());
+ for(Map.Entry<?,?> entry : ((Map<Object, Object>)value).entrySet())
+ {
+ Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
+ setTupleValue(mapEntryTuple, 0, entry.getKey(), keyValidator);
+ setTupleValue(mapEntryTuple, 1, entry.getValue(), valueValidator);
+ innerTuple.set(i, mapEntryTuple);
+ i++;
+ }
+ tuple.set(position, innerTuple);
+ }
+
+ /** convert a cql column to an object */
+ private Object cqlColumnToObj(IColumn col, CfDef cfDef) throws IOException
+ {
+ // standard
+ Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+ if (validators.get(col.name()) == null)
+ {
+ Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+ return marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value());
+ }
+ else
+ return validators.get(col.name()).compose(col.value());
+ }
+
/** set read configuration settings */
public void setLocation(String location, Job job) throws IOException
{
@@ -410,7 +480,7 @@ public class CqlStorage extends AbstractCassandraStorage
// output prepared statement
if (urlQuery.containsKey("output_query"))
- outputQuery = urlQuery.get("output_query").replaceAll("#", "?").replaceAll("@", "=");
+ outputQuery = urlQuery.get("output_query");
// user defined where clause
if (urlQuery.containsKey("where_clause"))
@@ -457,7 +527,7 @@ public class CqlStorage extends AbstractCassandraStorage
String name = be.getLhs().toString();
String value = be.getRhs().toString();
OpType op = expression.getOpType();
- String opString = op.name();
+ String opString = op.toString();
switch (op)
{
case OP_EQ:
[4/6] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Posted by br...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0
Conflicts:
src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/678aa37a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/678aa37a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/678aa37a
Branch: refs/heads/trunk
Commit: 678aa37af3322e805a3a639890e18391ce22426f
Parents: 1ff0d8e 70297f9
Author: Brandon Williams <br...@apache.org>
Authored: Tue Sep 10 13:44:47 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Sep 10 13:44:47 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../hadoop/pig/AbstractCassandraStorage.java | 78 +++++++++++++++-----
.../apache/cassandra/hadoop/pig/CqlStorage.java | 78 +++++++++++++++++++-
3 files changed, 135 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/678aa37a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 22fa74b,4d5b446..f9a3b80
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -25,54 -9,14 +25,55 @@@ Merged from 1.2
* Fix streaming does not transfer wrapped range (CASSANDRA-5948)
* Fix loading index summary containing empty key (CASSANDRA-5965)
* Correctly handle limits in CompositesSearcher (CASSANDRA-5975)
+ * Pig: handle CQL collections (CASSANDRA-5867)
-1.2.9
+2.0.0
+ * Fix thrift validation when inserting into CQL3 tables (CASSANDRA-5138)
+ * Fix periodic memtable flushing behavior with clean memtables (CASSANDRA-5931)
+ * Fix dateOf() function for pre-2.0 timestamp columns (CASSANDRA-5928)
+ * Fix SSTable unintentionally loads BF when opened for batch (CASSANDRA-5938)
+ * Add stream session progress to JMX (CASSANDRA-4757)
+ * Fix NPE during CAS operation (CASSANDRA-5925)
+Merged from 1.2:
* Fix getBloomFilterDiskSpaceUsed for AlwaysPresentFilter (CASSANDRA-5900)
- * migrate 1.1 schema_columnfamilies.key_alias column to key_aliases
- (CASSANDRA-5800)
- * add --migrate option to sstableupgrade and sstablescrub (CASSANDRA-5831)
+ * Don't announce schema version until we've loaded the changes locally
+ (CASSANDRA-5904)
+ * Fix to support off heap bloom filters size greater than 2 GB (CASSANDRA-5903)
+ * Properly handle parsing huge map and set literals (CASSANDRA-5893)
+
+
+2.0.0-rc2
+ * enable vnodes by default (CASSANDRA-5869)
+ * fix CAS contention timeout (CASSANDRA-5830)
+ * fix HsHa to respect max frame size (CASSANDRA-4573)
+ * Fix (some) 2i on composite components omissions (CASSANDRA-5851)
+ * cqlsh: add DESCRIBE FULL SCHEMA variant (CASSANDRA-5880)
+Merged from 1.2:
+ * Correctly validate sparse composite cells in scrub (CASSANDRA-5855)
+ * Add KeyCacheHitRate metric to CF metrics (CASSANDRA-5868)
+ * cqlsh: add support for multiline comments (CASSANDRA-5798)
+ * Handle CQL3 SELECT duplicate IN restrictions on clustering columns
+ (CASSANDRA-5856)
+
+
+2.0.0-rc1
+ * improve DecimalSerializer performance (CASSANDRA-5837)
+ * fix potential spurious wakeup in AsyncOneResponse (CASSANDRA-5690)
+ * fix schema-related trigger issues (CASSANDRA-5774)
+ * Better validation when accessing CQL3 table from thrift (CASSANDRA-5138)
+ * Fix assertion error during repair (CASSANDRA-5801)
+ * Fix range tombstone bug (CASSANDRA-5805)
+ * DC-local CAS (CASSANDRA-5797)
+ * Add a native_protocol_version column to the system.local table (CASSANRDA-5819)
+ * Use index_interval from cassandra.yaml when upgraded (CASSANDRA-5822)
+ * Fix buffer underflow on socket close (CASSANDRA-5792)
+Merged from 1.2:
+ * Fix reading DeletionTime from 1.1-format sstables (CASSANDRA-5814)
+ * cqlsh: add collections support to COPY (CASSANDRA-5698)
+ * retry important messages for any IOException (CASSANDRA-5804)
+ * Allow empty IN relations in SELECT/UPDATE/DELETE statements (CASSANDRA-5626)
+ * cqlsh: fix crashing on Windows due to libedit detection (CASSANDRA-5812)
* fix bulk-loading compressed sstables (CASSANDRA-5820)
* (Hadoop) fix quoting in CqlPagingRecordReader and CqlRecordWriter
(CASSANDRA-5824)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/678aa37a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/678aa37a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 84d7a7a,a73e5a5..2b76b83
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@@ -105,11 -107,12 +106,12 @@@ public class CqlStorage extends Abstrac
ByteBuffer columnValue = columns.get(ByteBufferUtil.string(cdef.name.duplicate()));
if (columnValue != null)
{
- IColumn column = new Column(cdef.name, columnValue);
+ Column column = new Column(cdef.name, columnValue);
- tuple.set(i, columnToTuple(column, cfDef, UTF8Type.instance));
+ AbstractType<?> validator = getValidatorMap(cfDef).get(column.name());
+ setTupleValue(tuple, i, cqlColumnToObj(column, cfDef), validator);
}
else
- tuple.set(i, TupleFactory.getInstance().newTuple());
+ tuple.set(i, null);
i++;
}
return tuple;
@@@ -120,6 -123,74 +122,74 @@@
}
}
+ /** set the value to the position of the tuple */
+ private void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ if (validator instanceof CollectionType)
+ setCollectionTupleValues(tuple, position, value, validator);
+ else
+ setTupleValue(tuple, position, value);
+ }
+
+ /** set the values of set/list at and after the position of the tuple */
+ private void setCollectionTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ if (validator instanceof MapType)
+ {
+ setMapTupleValues(tuple, position, value, validator);
+ return;
+ }
+ AbstractType<?> elementValidator;
+ if (validator instanceof SetType)
+ elementValidator = ((SetType<?>) validator).elements;
+ else if (validator instanceof ListType)
+ elementValidator = ((ListType<?>) validator).elements;
+ else
+ return;
+
+ int i = 0;
+ Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
+ for (Object entry : (Collection<?>) value)
+ {
+ setTupleValue(innerTuple, i, entry, elementValidator);
+ i++;
+ }
+ tuple.set(position, innerTuple);
+ }
+
+ /** set the values of set/list at and after the position of the tuple */
+ private void setMapTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ AbstractType<?> keyValidator = ((MapType<?, ?>) validator).keys;
+ AbstractType<?> valueValidator = ((MapType<?, ?>) validator).values;
+
+ int i = 0;
+ Tuple innerTuple = TupleFactory.getInstance().newTuple(((Map<?,?>) value).size());
+ for(Map.Entry<?,?> entry : ((Map<Object, Object>)value).entrySet())
+ {
+ Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
+ setTupleValue(mapEntryTuple, 0, entry.getKey(), keyValidator);
+ setTupleValue(mapEntryTuple, 1, entry.getValue(), valueValidator);
+ innerTuple.set(i, mapEntryTuple);
+ i++;
+ }
+ tuple.set(position, innerTuple);
+ }
+
+ /** convert a cql column to an object */
- private Object cqlColumnToObj(IColumn col, CfDef cfDef) throws IOException
++ private Object cqlColumnToObj(Column col, CfDef cfDef) throws IOException
+ {
+ // standard
+ Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+ if (validators.get(col.name()) == null)
+ {
+ Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+ return marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value());
+ }
+ else
+ return validators.get(col.name()).compose(col.value());
+ }
+
/** set read configuration settings */
public void setLocation(String location, Job job) throws IOException
{
@@@ -455,8 -524,10 +525,8 @@@
private String partitionFilterToWhereClauseString(Expression expression)
{
Expression.BinaryExpression be = (Expression.BinaryExpression) expression;
- String name = be.getLhs().toString();
- String value = be.getRhs().toString();
OpType op = expression.getOpType();
- String opString = op.name();
+ String opString = op.toString();
switch (op)
{
case OP_EQ:
[5/6] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Posted by br...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0
Conflicts:
src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/678aa37a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/678aa37a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/678aa37a
Branch: refs/heads/cassandra-2.0
Commit: 678aa37af3322e805a3a639890e18391ce22426f
Parents: 1ff0d8e 70297f9
Author: Brandon Williams <br...@apache.org>
Authored: Tue Sep 10 13:44:47 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Sep 10 13:44:47 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../hadoop/pig/AbstractCassandraStorage.java | 78 +++++++++++++++-----
.../apache/cassandra/hadoop/pig/CqlStorage.java | 78 +++++++++++++++++++-
3 files changed, 135 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/678aa37a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 22fa74b,4d5b446..f9a3b80
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -25,54 -9,14 +25,55 @@@ Merged from 1.2
* Fix streaming does not transfer wrapped range (CASSANDRA-5948)
* Fix loading index summary containing empty key (CASSANDRA-5965)
* Correctly handle limits in CompositesSearcher (CASSANDRA-5975)
+ * Pig: handle CQL collections (CASSANDRA-5867)
-1.2.9
+2.0.0
+ * Fix thrift validation when inserting into CQL3 tables (CASSANDRA-5138)
+ * Fix periodic memtable flushing behavior with clean memtables (CASSANDRA-5931)
+ * Fix dateOf() function for pre-2.0 timestamp columns (CASSANDRA-5928)
+ * Fix SSTable unintentionally loads BF when opened for batch (CASSANDRA-5938)
+ * Add stream session progress to JMX (CASSANDRA-4757)
+ * Fix NPE during CAS operation (CASSANDRA-5925)
+Merged from 1.2:
* Fix getBloomFilterDiskSpaceUsed for AlwaysPresentFilter (CASSANDRA-5900)
- * migrate 1.1 schema_columnfamilies.key_alias column to key_aliases
- (CASSANDRA-5800)
- * add --migrate option to sstableupgrade and sstablescrub (CASSANDRA-5831)
+ * Don't announce schema version until we've loaded the changes locally
+ (CASSANDRA-5904)
+ * Fix to support off heap bloom filters size greater than 2 GB (CASSANDRA-5903)
+ * Properly handle parsing huge map and set literals (CASSANDRA-5893)
+
+
+2.0.0-rc2
+ * enable vnodes by default (CASSANDRA-5869)
+ * fix CAS contention timeout (CASSANDRA-5830)
+ * fix HsHa to respect max frame size (CASSANDRA-4573)
+ * Fix (some) 2i on composite components omissions (CASSANDRA-5851)
+ * cqlsh: add DESCRIBE FULL SCHEMA variant (CASSANDRA-5880)
+Merged from 1.2:
+ * Correctly validate sparse composite cells in scrub (CASSANDRA-5855)
+ * Add KeyCacheHitRate metric to CF metrics (CASSANDRA-5868)
+ * cqlsh: add support for multiline comments (CASSANDRA-5798)
+ * Handle CQL3 SELECT duplicate IN restrictions on clustering columns
+ (CASSANDRA-5856)
+
+
+2.0.0-rc1
+ * improve DecimalSerializer performance (CASSANDRA-5837)
+ * fix potential spurious wakeup in AsyncOneResponse (CASSANDRA-5690)
+ * fix schema-related trigger issues (CASSANDRA-5774)
+ * Better validation when accessing CQL3 table from thrift (CASSANDRA-5138)
+ * Fix assertion error during repair (CASSANDRA-5801)
+ * Fix range tombstone bug (CASSANDRA-5805)
+ * DC-local CAS (CASSANDRA-5797)
+ * Add a native_protocol_version column to the system.local table (CASSANRDA-5819)
+ * Use index_interval from cassandra.yaml when upgraded (CASSANDRA-5822)
+ * Fix buffer underflow on socket close (CASSANDRA-5792)
+Merged from 1.2:
+ * Fix reading DeletionTime from 1.1-format sstables (CASSANDRA-5814)
+ * cqlsh: add collections support to COPY (CASSANDRA-5698)
+ * retry important messages for any IOException (CASSANDRA-5804)
+ * Allow empty IN relations in SELECT/UPDATE/DELETE statements (CASSANDRA-5626)
+ * cqlsh: fix crashing on Windows due to libedit detection (CASSANDRA-5812)
* fix bulk-loading compressed sstables (CASSANDRA-5820)
* (Hadoop) fix quoting in CqlPagingRecordReader and CqlRecordWriter
(CASSANDRA-5824)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/678aa37a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/678aa37a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 84d7a7a,a73e5a5..2b76b83
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@@ -105,11 -107,12 +106,12 @@@ public class CqlStorage extends Abstrac
ByteBuffer columnValue = columns.get(ByteBufferUtil.string(cdef.name.duplicate()));
if (columnValue != null)
{
- IColumn column = new Column(cdef.name, columnValue);
+ Column column = new Column(cdef.name, columnValue);
- tuple.set(i, columnToTuple(column, cfDef, UTF8Type.instance));
+ AbstractType<?> validator = getValidatorMap(cfDef).get(column.name());
+ setTupleValue(tuple, i, cqlColumnToObj(column, cfDef), validator);
}
else
- tuple.set(i, TupleFactory.getInstance().newTuple());
+ tuple.set(i, null);
i++;
}
return tuple;
@@@ -120,6 -123,74 +122,74 @@@
}
}
+ /** set the value to the position of the tuple */
+ private void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ if (validator instanceof CollectionType)
+ setCollectionTupleValues(tuple, position, value, validator);
+ else
+ setTupleValue(tuple, position, value);
+ }
+
+ /** set the values of set/list at and after the position of the tuple */
+ private void setCollectionTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ if (validator instanceof MapType)
+ {
+ setMapTupleValues(tuple, position, value, validator);
+ return;
+ }
+ AbstractType<?> elementValidator;
+ if (validator instanceof SetType)
+ elementValidator = ((SetType<?>) validator).elements;
+ else if (validator instanceof ListType)
+ elementValidator = ((ListType<?>) validator).elements;
+ else
+ return;
+
+ int i = 0;
+ Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
+ for (Object entry : (Collection<?>) value)
+ {
+ setTupleValue(innerTuple, i, entry, elementValidator);
+ i++;
+ }
+ tuple.set(position, innerTuple);
+ }
+
+ /** set the values of set/list at and after the position of the tuple */
+ private void setMapTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ {
+ AbstractType<?> keyValidator = ((MapType<?, ?>) validator).keys;
+ AbstractType<?> valueValidator = ((MapType<?, ?>) validator).values;
+
+ int i = 0;
+ Tuple innerTuple = TupleFactory.getInstance().newTuple(((Map<?,?>) value).size());
+ for(Map.Entry<?,?> entry : ((Map<Object, Object>)value).entrySet())
+ {
+ Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
+ setTupleValue(mapEntryTuple, 0, entry.getKey(), keyValidator);
+ setTupleValue(mapEntryTuple, 1, entry.getValue(), valueValidator);
+ innerTuple.set(i, mapEntryTuple);
+ i++;
+ }
+ tuple.set(position, innerTuple);
+ }
+
+ /** convert a cql column to an object */
- private Object cqlColumnToObj(IColumn col, CfDef cfDef) throws IOException
++ private Object cqlColumnToObj(Column col, CfDef cfDef) throws IOException
+ {
+ // standard
+ Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+ if (validators.get(col.name()) == null)
+ {
+ Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+ return marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value());
+ }
+ else
+ return validators.get(col.name()).compose(col.value());
+ }
+
/** set read configuration settings */
public void setLocation(String location, Job job) throws IOException
{
@@@ -455,8 -524,10 +525,8 @@@
private String partitionFilterToWhereClauseString(Expression expression)
{
Expression.BinaryExpression be = (Expression.BinaryExpression) expression;
- String name = be.getLhs().toString();
- String value = be.getRhs().toString();
OpType op = expression.getOpType();
- String opString = op.name();
+ String opString = op.toString();
switch (op)
{
case OP_EQ:
[6/6] git commit: Merge branch 'cassandra-2.0' into trunk
Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7246502e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7246502e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7246502e
Branch: refs/heads/trunk
Commit: 7246502e7f6cd1e98b5199412541b621d7f12ffa
Parents: 3822612 678aa37
Author: Brandon Williams <br...@apache.org>
Authored: Tue Sep 10 13:45:13 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Sep 10 13:45:13 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../hadoop/pig/AbstractCassandraStorage.java | 78 +++++++++++++++-----
.../apache/cassandra/hadoop/pig/CqlStorage.java | 78 +++++++++++++++++++-
3 files changed, 135 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7246502e/CHANGES.txt
----------------------------------------------------------------------