You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2013/09/10 20:45:50 UTC

[1/6] git commit: Pig support for CQL collections. Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5867

Updated Branches:
  refs/heads/cassandra-1.2 caef32e5d -> 70297f9ad
  refs/heads/cassandra-2.0 1ff0d8e9f -> 678aa37af
  refs/heads/trunk 382261218 -> 7246502e7


Pig support for CQL collections.
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5867


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/70297f9a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/70297f9a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/70297f9a

Branch: refs/heads/cassandra-1.2
Commit: 70297f9ad44d52cc9612cd91e7305969fc86e204
Parents: caef32e
Author: Brandon Williams <br...@apache.org>
Authored: Tue Sep 10 13:28:23 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Sep 10 13:28:23 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../hadoop/pig/AbstractCassandraStorage.java    | 78 +++++++++++++++-----
 .../apache/cassandra/hadoop/pig/CqlStorage.java | 78 +++++++++++++++++++-
 3 files changed, 135 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2328bf7..4d5b446 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
  * Fix streaming does not transfer wrapped range (CASSANDRA-5948)
  * Fix loading index summary containing empty key (CASSANDRA-5965)
  * Correctly handle limits in CompositesSearcher (CASSANDRA-5975)
+ * Pig: handle CQL collections (CASSANDRA-5867)
 
 
 1.2.9

http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 59d7817..03805d2 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -330,7 +330,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             return DataType.FLOAT;
         else if (type instanceof DoubleType)
             return DataType.DOUBLE;
-        else if (type instanceof AbstractCompositeType )
+        else if (type instanceof AbstractCompositeType || type instanceof CollectionType)
             return DataType.TUPLE;
 
         return DataType.BYTEARRAY;
@@ -401,30 +401,72 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
         if(o instanceof Tuple) {
             List<Object> objects = ((Tuple)o).getAll();
-            List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
-            int totalLength = 0;
-            for(Object sub : objects)
+            //collections
+            if (objects.size() > 0 && objects.get(0) instanceof String)
             {
-                ByteBuffer buffer = objToBB(sub);
-                serialized.add(buffer);
-                totalLength += 2 + buffer.remaining() + 1;
-            }
-            ByteBuffer out = ByteBuffer.allocate(totalLength);
-            for (ByteBuffer bb : serialized)
-            {
-                int length = bb.remaining();
-                out.put((byte) ((length >> 8) & 0xFF));
-                out.put((byte) (length & 0xFF));
-                out.put(bb);
-                out.put((byte) 0);
+                String collectionType = (String) objects.get(0);
+                if ("set".equalsIgnoreCase(collectionType) ||
+                        "list".equalsIgnoreCase(collectionType))
+                    return objToListOrSetBB(objects.subList(1, objects.size()));
+                else if ("map".equalsIgnoreCase(collectionType))
+                    return objToMapBB(objects.subList(1, objects.size()));
+                   
             }
-            out.flip();
-            return out;
+            return objToCompositeBB(objects);
         }
 
         return ByteBuffer.wrap(((DataByteArray) o).get());
     }
 
+    private ByteBuffer objToListOrSetBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+        for(Object sub : objects)
+        {
+            ByteBuffer buffer = objToBB(sub);
+            serialized.add(buffer);
+        }      
+        return CollectionType.pack(serialized, objects.size());
+    }
+
+    private ByteBuffer objToMapBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2);
+        for(Object sub : objects)
+        {
+            List<Object> keyValue = ((Tuple)sub).getAll();
+            for (Object entry: keyValue)
+            {
+                ByteBuffer buffer = objToBB(entry);
+                serialized.add(buffer);
+            }
+        } 
+        return CollectionType.pack(serialized, objects.size());
+    }
+
+    private ByteBuffer objToCompositeBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+        int totalLength = 0;
+        for(Object sub : objects)
+        {
+            ByteBuffer buffer = objToBB(sub);
+            serialized.add(buffer);
+            totalLength += 2 + buffer.remaining() + 1;
+        }
+        ByteBuffer out = ByteBuffer.allocate(totalLength);
+        for (ByteBuffer bb : serialized)
+        {
+            int length = bb.remaining();
+            out.put((byte) ((length >> 8) & 0xFF));
+            out.put((byte) (length & 0xFF));
+            out.put(bb);
+            out.put((byte) 0);
+        }
+        out.flip();
+        return out;
+    }
+
     public void cleanupOnFailure(String failure, Job job)
     {
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 7e22823..a73e5a5 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -35,6 +35,7 @@ import org.apache.pig.Expression;
 import org.apache.pig.Expression.OpType;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.*;
 import org.apache.pig.impl.util.UDFContext;
@@ -107,10 +108,11 @@ public class CqlStorage extends AbstractCassandraStorage
                 if (columnValue != null)
                 {
                     IColumn column = new Column(cdef.name, columnValue);
-                    tuple.set(i, columnToTuple(column, cfDef, UTF8Type.instance));
+                    AbstractType<?> validator = getValidatorMap(cfDef).get(column.name());
+                    setTupleValue(tuple, i, cqlColumnToObj(column, cfDef), validator);
                 }
                 else
-                    tuple.set(i, TupleFactory.getInstance().newTuple());
+                    tuple.set(i, null);
                 i++;
             }
             return tuple;
@@ -121,6 +123,74 @@ public class CqlStorage extends AbstractCassandraStorage
         }
     }
 
+    /** set the value to the position of the tuple */
+    private void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+    {
+        if (validator instanceof CollectionType)
+            setCollectionTupleValues(tuple, position, value, validator);
+        else
+           setTupleValue(tuple, position, value);
+    }
+
+    /** set the values of set/list at and after the position of the tuple */
+    private void setCollectionTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+    {
+        if (validator instanceof MapType)
+        {
+            setMapTupleValues(tuple, position, value, validator);
+            return;
+        }
+        AbstractType<?> elementValidator;
+        if (validator instanceof SetType)
+            elementValidator = ((SetType<?>) validator).elements;
+        else if (validator instanceof ListType)
+            elementValidator = ((ListType<?>) validator).elements;
+        else 
+            return;
+        
+        int i = 0;
+        Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
+        for (Object entry : (Collection<?>) value)
+        {
+            setTupleValue(innerTuple, i, entry, elementValidator);
+            i++;
+        }
+        tuple.set(position, innerTuple);
+    }
+
+    /** set the values of set/list at and after the position of the tuple */
+    private void setMapTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+    {
+        AbstractType<?> keyValidator = ((MapType<?, ?>) validator).keys;
+        AbstractType<?> valueValidator = ((MapType<?, ?>) validator).values;
+        
+        int i = 0;
+        Tuple innerTuple = TupleFactory.getInstance().newTuple(((Map<?,?>) value).size());
+        for(Map.Entry<?,?> entry :  ((Map<Object, Object>)value).entrySet())
+        {
+            Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
+            setTupleValue(mapEntryTuple, 0, entry.getKey(), keyValidator);
+            setTupleValue(mapEntryTuple, 1, entry.getValue(), valueValidator);
+            innerTuple.set(i, mapEntryTuple);
+            i++;
+        }
+        tuple.set(position, innerTuple);
+    }
+
+    /** convert a cql column to an object */
+    private Object cqlColumnToObj(IColumn col, CfDef cfDef) throws IOException
+    {
+        // standard
+        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+        if (validators.get(col.name()) == null)
+        {
+            Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+            return marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value());
+        }
+        else
+            return validators.get(col.name()).compose(col.value());
+    }
+
     /** set read configuration settings */
     public void setLocation(String location, Job job) throws IOException
     {
@@ -410,7 +480,7 @@ public class CqlStorage extends AbstractCassandraStorage
 
                 // output prepared statement
                 if (urlQuery.containsKey("output_query"))
-                    outputQuery = urlQuery.get("output_query").replaceAll("#", "?").replaceAll("@", "=");
+                    outputQuery = urlQuery.get("output_query");
 
                 // user defined where clause
                 if (urlQuery.containsKey("where_clause"))
@@ -457,7 +527,7 @@ public class CqlStorage extends AbstractCassandraStorage
         String name = be.getLhs().toString();
         String value = be.getRhs().toString();
         OpType op = expression.getOpType();
-        String opString = op.name();
+        String opString = op.toString();
         switch (op)
         {
             case OP_EQ:


[3/6] git commit: Pig support for CQL collections. Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5867

Posted by br...@apache.org.
Pig support for CQL collections.
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5867


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/70297f9a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/70297f9a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/70297f9a

Branch: refs/heads/trunk
Commit: 70297f9ad44d52cc9612cd91e7305969fc86e204
Parents: caef32e
Author: Brandon Williams <br...@apache.org>
Authored: Tue Sep 10 13:28:23 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Sep 10 13:28:23 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../hadoop/pig/AbstractCassandraStorage.java    | 78 +++++++++++++++-----
 .../apache/cassandra/hadoop/pig/CqlStorage.java | 78 +++++++++++++++++++-
 3 files changed, 135 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2328bf7..4d5b446 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
  * Fix streaming does not transfer wrapped range (CASSANDRA-5948)
  * Fix loading index summary containing empty key (CASSANDRA-5965)
  * Correctly handle limits in CompositesSearcher (CASSANDRA-5975)
+ * Pig: handle CQL collections (CASSANDRA-5867)
 
 
 1.2.9

http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 59d7817..03805d2 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -330,7 +330,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             return DataType.FLOAT;
         else if (type instanceof DoubleType)
             return DataType.DOUBLE;
-        else if (type instanceof AbstractCompositeType )
+        else if (type instanceof AbstractCompositeType || type instanceof CollectionType)
             return DataType.TUPLE;
 
         return DataType.BYTEARRAY;
@@ -401,30 +401,72 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
         if(o instanceof Tuple) {
             List<Object> objects = ((Tuple)o).getAll();
-            List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
-            int totalLength = 0;
-            for(Object sub : objects)
+            //collections
+            if (objects.size() > 0 && objects.get(0) instanceof String)
             {
-                ByteBuffer buffer = objToBB(sub);
-                serialized.add(buffer);
-                totalLength += 2 + buffer.remaining() + 1;
-            }
-            ByteBuffer out = ByteBuffer.allocate(totalLength);
-            for (ByteBuffer bb : serialized)
-            {
-                int length = bb.remaining();
-                out.put((byte) ((length >> 8) & 0xFF));
-                out.put((byte) (length & 0xFF));
-                out.put(bb);
-                out.put((byte) 0);
+                String collectionType = (String) objects.get(0);
+                if ("set".equalsIgnoreCase(collectionType) ||
+                        "list".equalsIgnoreCase(collectionType))
+                    return objToListOrSetBB(objects.subList(1, objects.size()));
+                else if ("map".equalsIgnoreCase(collectionType))
+                    return objToMapBB(objects.subList(1, objects.size()));
+                   
             }
-            out.flip();
-            return out;
+            return objToCompositeBB(objects);
         }
 
         return ByteBuffer.wrap(((DataByteArray) o).get());
     }
 
+    private ByteBuffer objToListOrSetBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+        for(Object sub : objects)
+        {
+            ByteBuffer buffer = objToBB(sub);
+            serialized.add(buffer);
+        }      
+        return CollectionType.pack(serialized, objects.size());
+    }
+
+    private ByteBuffer objToMapBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2);
+        for(Object sub : objects)
+        {
+            List<Object> keyValue = ((Tuple)sub).getAll();
+            for (Object entry: keyValue)
+            {
+                ByteBuffer buffer = objToBB(entry);
+                serialized.add(buffer);
+            }
+        } 
+        return CollectionType.pack(serialized, objects.size());
+    }
+
+    private ByteBuffer objToCompositeBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+        int totalLength = 0;
+        for(Object sub : objects)
+        {
+            ByteBuffer buffer = objToBB(sub);
+            serialized.add(buffer);
+            totalLength += 2 + buffer.remaining() + 1;
+        }
+        ByteBuffer out = ByteBuffer.allocate(totalLength);
+        for (ByteBuffer bb : serialized)
+        {
+            int length = bb.remaining();
+            out.put((byte) ((length >> 8) & 0xFF));
+            out.put((byte) (length & 0xFF));
+            out.put(bb);
+            out.put((byte) 0);
+        }
+        out.flip();
+        return out;
+    }
+
     public void cleanupOnFailure(String failure, Job job)
     {
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 7e22823..a73e5a5 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -35,6 +35,7 @@ import org.apache.pig.Expression;
 import org.apache.pig.Expression.OpType;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.*;
 import org.apache.pig.impl.util.UDFContext;
@@ -107,10 +108,11 @@ public class CqlStorage extends AbstractCassandraStorage
                 if (columnValue != null)
                 {
                     IColumn column = new Column(cdef.name, columnValue);
-                    tuple.set(i, columnToTuple(column, cfDef, UTF8Type.instance));
+                    AbstractType<?> validator = getValidatorMap(cfDef).get(column.name());
+                    setTupleValue(tuple, i, cqlColumnToObj(column, cfDef), validator);
                 }
                 else
-                    tuple.set(i, TupleFactory.getInstance().newTuple());
+                    tuple.set(i, null);
                 i++;
             }
             return tuple;
@@ -121,6 +123,74 @@ public class CqlStorage extends AbstractCassandraStorage
         }
     }
 
+    /** set the value to the position of the tuple */
+    private void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+    {
+        if (validator instanceof CollectionType)
+            setCollectionTupleValues(tuple, position, value, validator);
+        else
+           setTupleValue(tuple, position, value);
+    }
+
+    /** set the values of set/list at and after the position of the tuple */
+    private void setCollectionTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+    {
+        if (validator instanceof MapType)
+        {
+            setMapTupleValues(tuple, position, value, validator);
+            return;
+        }
+        AbstractType<?> elementValidator;
+        if (validator instanceof SetType)
+            elementValidator = ((SetType<?>) validator).elements;
+        else if (validator instanceof ListType)
+            elementValidator = ((ListType<?>) validator).elements;
+        else 
+            return;
+        
+        int i = 0;
+        Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
+        for (Object entry : (Collection<?>) value)
+        {
+            setTupleValue(innerTuple, i, entry, elementValidator);
+            i++;
+        }
+        tuple.set(position, innerTuple);
+    }
+
+    /** set the values of set/list at and after the position of the tuple */
+    private void setMapTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+    {
+        AbstractType<?> keyValidator = ((MapType<?, ?>) validator).keys;
+        AbstractType<?> valueValidator = ((MapType<?, ?>) validator).values;
+        
+        int i = 0;
+        Tuple innerTuple = TupleFactory.getInstance().newTuple(((Map<?,?>) value).size());
+        for(Map.Entry<?,?> entry :  ((Map<Object, Object>)value).entrySet())
+        {
+            Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
+            setTupleValue(mapEntryTuple, 0, entry.getKey(), keyValidator);
+            setTupleValue(mapEntryTuple, 1, entry.getValue(), valueValidator);
+            innerTuple.set(i, mapEntryTuple);
+            i++;
+        }
+        tuple.set(position, innerTuple);
+    }
+
+    /** convert a cql column to an object */
+    private Object cqlColumnToObj(IColumn col, CfDef cfDef) throws IOException
+    {
+        // standard
+        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+        if (validators.get(col.name()) == null)
+        {
+            Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+            return marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value());
+        }
+        else
+            return validators.get(col.name()).compose(col.value());
+    }
+
     /** set read configuration settings */
     public void setLocation(String location, Job job) throws IOException
     {
@@ -410,7 +480,7 @@ public class CqlStorage extends AbstractCassandraStorage
 
                 // output prepared statement
                 if (urlQuery.containsKey("output_query"))
-                    outputQuery = urlQuery.get("output_query").replaceAll("#", "?").replaceAll("@", "=");
+                    outputQuery = urlQuery.get("output_query");
 
                 // user defined where clause
                 if (urlQuery.containsKey("where_clause"))
@@ -457,7 +527,7 @@ public class CqlStorage extends AbstractCassandraStorage
         String name = be.getLhs().toString();
         String value = be.getRhs().toString();
         OpType op = expression.getOpType();
-        String opString = op.name();
+        String opString = op.toString();
         switch (op)
         {
             case OP_EQ:


[2/6] git commit: Pig support for CQL collections. Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5867

Posted by br...@apache.org.
Pig support for CQL collections.
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5867


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/70297f9a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/70297f9a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/70297f9a

Branch: refs/heads/cassandra-2.0
Commit: 70297f9ad44d52cc9612cd91e7305969fc86e204
Parents: caef32e
Author: Brandon Williams <br...@apache.org>
Authored: Tue Sep 10 13:28:23 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Sep 10 13:28:23 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../hadoop/pig/AbstractCassandraStorage.java    | 78 +++++++++++++++-----
 .../apache/cassandra/hadoop/pig/CqlStorage.java | 78 +++++++++++++++++++-
 3 files changed, 135 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2328bf7..4d5b446 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
  * Fix streaming does not transfer wrapped range (CASSANDRA-5948)
  * Fix loading index summary containing empty key (CASSANDRA-5965)
  * Correctly handle limits in CompositesSearcher (CASSANDRA-5975)
+ * Pig: handle CQL collections (CASSANDRA-5867)
 
 
 1.2.9

http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 59d7817..03805d2 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -330,7 +330,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             return DataType.FLOAT;
         else if (type instanceof DoubleType)
             return DataType.DOUBLE;
-        else if (type instanceof AbstractCompositeType )
+        else if (type instanceof AbstractCompositeType || type instanceof CollectionType)
             return DataType.TUPLE;
 
         return DataType.BYTEARRAY;
@@ -401,30 +401,72 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
         if(o instanceof Tuple) {
             List<Object> objects = ((Tuple)o).getAll();
-            List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
-            int totalLength = 0;
-            for(Object sub : objects)
+            //collections
+            if (objects.size() > 0 && objects.get(0) instanceof String)
             {
-                ByteBuffer buffer = objToBB(sub);
-                serialized.add(buffer);
-                totalLength += 2 + buffer.remaining() + 1;
-            }
-            ByteBuffer out = ByteBuffer.allocate(totalLength);
-            for (ByteBuffer bb : serialized)
-            {
-                int length = bb.remaining();
-                out.put((byte) ((length >> 8) & 0xFF));
-                out.put((byte) (length & 0xFF));
-                out.put(bb);
-                out.put((byte) 0);
+                String collectionType = (String) objects.get(0);
+                if ("set".equalsIgnoreCase(collectionType) ||
+                        "list".equalsIgnoreCase(collectionType))
+                    return objToListOrSetBB(objects.subList(1, objects.size()));
+                else if ("map".equalsIgnoreCase(collectionType))
+                    return objToMapBB(objects.subList(1, objects.size()));
+                   
             }
-            out.flip();
-            return out;
+            return objToCompositeBB(objects);
         }
 
         return ByteBuffer.wrap(((DataByteArray) o).get());
     }
 
+    private ByteBuffer objToListOrSetBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+        for(Object sub : objects)
+        {
+            ByteBuffer buffer = objToBB(sub);
+            serialized.add(buffer);
+        }      
+        return CollectionType.pack(serialized, objects.size());
+    }
+
+    private ByteBuffer objToMapBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2);
+        for(Object sub : objects)
+        {
+            List<Object> keyValue = ((Tuple)sub).getAll();
+            for (Object entry: keyValue)
+            {
+                ByteBuffer buffer = objToBB(entry);
+                serialized.add(buffer);
+            }
+        } 
+        return CollectionType.pack(serialized, objects.size());
+    }
+
+    private ByteBuffer objToCompositeBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+        int totalLength = 0;
+        for(Object sub : objects)
+        {
+            ByteBuffer buffer = objToBB(sub);
+            serialized.add(buffer);
+            totalLength += 2 + buffer.remaining() + 1;
+        }
+        ByteBuffer out = ByteBuffer.allocate(totalLength);
+        for (ByteBuffer bb : serialized)
+        {
+            int length = bb.remaining();
+            out.put((byte) ((length >> 8) & 0xFF));
+            out.put((byte) (length & 0xFF));
+            out.put(bb);
+            out.put((byte) 0);
+        }
+        out.flip();
+        return out;
+    }
+
     public void cleanupOnFailure(String failure, Job job)
     {
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/70297f9a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 7e22823..a73e5a5 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -35,6 +35,7 @@ import org.apache.pig.Expression;
 import org.apache.pig.Expression.OpType;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.*;
 import org.apache.pig.impl.util.UDFContext;
@@ -107,10 +108,11 @@ public class CqlStorage extends AbstractCassandraStorage
                 if (columnValue != null)
                 {
                     IColumn column = new Column(cdef.name, columnValue);
-                    tuple.set(i, columnToTuple(column, cfDef, UTF8Type.instance));
+                    AbstractType<?> validator = getValidatorMap(cfDef).get(column.name());
+                    setTupleValue(tuple, i, cqlColumnToObj(column, cfDef), validator);
                 }
                 else
-                    tuple.set(i, TupleFactory.getInstance().newTuple());
+                    tuple.set(i, null);
                 i++;
             }
             return tuple;
@@ -121,6 +123,74 @@ public class CqlStorage extends AbstractCassandraStorage
         }
     }
 
+    /** set the value to the position of the tuple */
+    private void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+    {
+        if (validator instanceof CollectionType)
+            setCollectionTupleValues(tuple, position, value, validator);
+        else
+           setTupleValue(tuple, position, value);
+    }
+
+    /** set the values of set/list at and after the position of the tuple */
+    private void setCollectionTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+    {
+        if (validator instanceof MapType)
+        {
+            setMapTupleValues(tuple, position, value, validator);
+            return;
+        }
+        AbstractType<?> elementValidator;
+        if (validator instanceof SetType)
+            elementValidator = ((SetType<?>) validator).elements;
+        else if (validator instanceof ListType)
+            elementValidator = ((ListType<?>) validator).elements;
+        else 
+            return;
+        
+        int i = 0;
+        Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
+        for (Object entry : (Collection<?>) value)
+        {
+            setTupleValue(innerTuple, i, entry, elementValidator);
+            i++;
+        }
+        tuple.set(position, innerTuple);
+    }
+
+    /** set the values of set/list at and after the position of the tuple */
+    private void setMapTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+    {
+        AbstractType<?> keyValidator = ((MapType<?, ?>) validator).keys;
+        AbstractType<?> valueValidator = ((MapType<?, ?>) validator).values;
+        
+        int i = 0;
+        Tuple innerTuple = TupleFactory.getInstance().newTuple(((Map<?,?>) value).size());
+        for(Map.Entry<?,?> entry :  ((Map<Object, Object>)value).entrySet())
+        {
+            Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
+            setTupleValue(mapEntryTuple, 0, entry.getKey(), keyValidator);
+            setTupleValue(mapEntryTuple, 1, entry.getValue(), valueValidator);
+            innerTuple.set(i, mapEntryTuple);
+            i++;
+        }
+        tuple.set(position, innerTuple);
+    }
+
+    /** convert a cql column to an object */
+    private Object cqlColumnToObj(IColumn col, CfDef cfDef) throws IOException
+    {
+        // standard
+        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+        if (validators.get(col.name()) == null)
+        {
+            Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+            return marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value());
+        }
+        else
+            return validators.get(col.name()).compose(col.value());
+    }
+
     /** set read configuration settings */
     public void setLocation(String location, Job job) throws IOException
     {
@@ -410,7 +480,7 @@ public class CqlStorage extends AbstractCassandraStorage
 
                 // output prepared statement
                 if (urlQuery.containsKey("output_query"))
-                    outputQuery = urlQuery.get("output_query").replaceAll("#", "?").replaceAll("@", "=");
+                    outputQuery = urlQuery.get("output_query");
 
                 // user defined where clause
                 if (urlQuery.containsKey("where_clause"))
@@ -457,7 +527,7 @@ public class CqlStorage extends AbstractCassandraStorage
         String name = be.getLhs().toString();
         String value = be.getRhs().toString();
         OpType op = expression.getOpType();
-        String opString = op.name();
+        String opString = op.toString();
         switch (op)
         {
             case OP_EQ:


[4/6] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0

Posted by br...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0

Conflicts:
	src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/678aa37a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/678aa37a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/678aa37a

Branch: refs/heads/trunk
Commit: 678aa37af3322e805a3a639890e18391ce22426f
Parents: 1ff0d8e 70297f9
Author: Brandon Williams <br...@apache.org>
Authored: Tue Sep 10 13:44:47 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Sep 10 13:44:47 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../hadoop/pig/AbstractCassandraStorage.java    | 78 +++++++++++++++-----
 .../apache/cassandra/hadoop/pig/CqlStorage.java | 78 +++++++++++++++++++-
 3 files changed, 135 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/678aa37a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 22fa74b,4d5b446..f9a3b80
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -25,54 -9,14 +25,55 @@@ Merged from 1.2
   * Fix streaming does not transfer wrapped range (CASSANDRA-5948)
   * Fix loading index summary containing empty key (CASSANDRA-5965)
   * Correctly handle limits in CompositesSearcher (CASSANDRA-5975)
+  * Pig: handle CQL collections (CASSANDRA-5867)
  
  
 -1.2.9
 +2.0.0
 + * Fix thrift validation when inserting into CQL3 tables (CASSANDRA-5138)
 + * Fix periodic memtable flushing behavior with clean memtables (CASSANDRA-5931)
 + * Fix dateOf() function for pre-2.0 timestamp columns (CASSANDRA-5928)
 + * Fix SSTable unintentionally loads BF when opened for batch (CASSANDRA-5938)
 + * Add stream session progress to JMX (CASSANDRA-4757)
 + * Fix NPE during CAS operation (CASSANDRA-5925)
 +Merged from 1.2:
   * Fix getBloomFilterDiskSpaceUsed for AlwaysPresentFilter (CASSANDRA-5900)
 - * migrate 1.1 schema_columnfamilies.key_alias column to key_aliases
 -   (CASSANDRA-5800)
 - * add --migrate option to sstableupgrade and sstablescrub (CASSANDRA-5831)
 + * Don't announce schema version until we've loaded the changes locally
 +   (CASSANDRA-5904)
 + * Fix to support off heap bloom filters size greater than 2 GB (CASSANDRA-5903)
 + * Properly handle parsing huge map and set literals (CASSANDRA-5893)
 +
 +
 +2.0.0-rc2
 + * enable vnodes by default (CASSANDRA-5869)
 + * fix CAS contention timeout (CASSANDRA-5830)
 + * fix HsHa to respect max frame size (CASSANDRA-4573)
 + * Fix (some) 2i on composite components omissions (CASSANDRA-5851)
 + * cqlsh: add DESCRIBE FULL SCHEMA variant (CASSANDRA-5880)
 +Merged from 1.2:
 + * Correctly validate sparse composite cells in scrub (CASSANDRA-5855)
 + * Add KeyCacheHitRate metric to CF metrics (CASSANDRA-5868)
 + * cqlsh: add support for multiline comments (CASSANDRA-5798)
 + * Handle CQL3 SELECT duplicate IN restrictions on clustering columns
 +   (CASSANDRA-5856)
 +
 +
 +2.0.0-rc1
 + * improve DecimalSerializer performance (CASSANDRA-5837)
 + * fix potential spurious wakeup in AsyncOneResponse (CASSANDRA-5690)
 + * fix schema-related trigger issues (CASSANDRA-5774)
 + * Better validation when accessing CQL3 table from thrift (CASSANDRA-5138)
 + * Fix assertion error during repair (CASSANDRA-5801)
 + * Fix range tombstone bug (CASSANDRA-5805)
 + * DC-local CAS (CASSANDRA-5797)
 + * Add a native_protocol_version column to the system.local table (CASSANRDA-5819)
 + * Use index_interval from cassandra.yaml when upgraded (CASSANDRA-5822)
 + * Fix buffer underflow on socket close (CASSANDRA-5792)
 +Merged from 1.2:
 + * Fix reading DeletionTime from 1.1-format sstables (CASSANDRA-5814)
 + * cqlsh: add collections support to COPY (CASSANDRA-5698)
 + * retry important messages for any IOException (CASSANDRA-5804)
 + * Allow empty IN relations in SELECT/UPDATE/DELETE statements (CASSANDRA-5626)
 + * cqlsh: fix crashing on Windows due to libedit detection (CASSANDRA-5812)
   * fix bulk-loading compressed sstables (CASSANDRA-5820)
   * (Hadoop) fix quoting in CqlPagingRecordReader and CqlRecordWriter 
     (CASSANDRA-5824)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/678aa37a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/678aa37a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 84d7a7a,a73e5a5..2b76b83
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@@ -105,11 -107,12 +106,12 @@@ public class CqlStorage extends Abstrac
                  ByteBuffer columnValue = columns.get(ByteBufferUtil.string(cdef.name.duplicate()));
                  if (columnValue != null)
                  {
 -                    IColumn column = new Column(cdef.name, columnValue);
 +                    Column column = new Column(cdef.name, columnValue);
-                     tuple.set(i, columnToTuple(column, cfDef, UTF8Type.instance));
+                     AbstractType<?> validator = getValidatorMap(cfDef).get(column.name());
+                     setTupleValue(tuple, i, cqlColumnToObj(column, cfDef), validator);
                  }
                  else
-                     tuple.set(i, TupleFactory.getInstance().newTuple());
+                     tuple.set(i, null);
                  i++;
              }
              return tuple;
@@@ -120,6 -123,74 +122,74 @@@
          }
      }
  
+     /** set the value to the position of the tuple */
+     private void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+     {
+         if (validator instanceof CollectionType)
+             setCollectionTupleValues(tuple, position, value, validator);
+         else
+            setTupleValue(tuple, position, value);
+     }
+ 
+     /** set the values of set/list at and after the position of the tuple */
+     private void setCollectionTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+     {
+         if (validator instanceof MapType)
+         {
+             setMapTupleValues(tuple, position, value, validator);
+             return;
+         }
+         AbstractType<?> elementValidator;
+         if (validator instanceof SetType)
+             elementValidator = ((SetType<?>) validator).elements;
+         else if (validator instanceof ListType)
+             elementValidator = ((ListType<?>) validator).elements;
+         else 
+             return;
+         
+         int i = 0;
+         Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
+         for (Object entry : (Collection<?>) value)
+         {
+             setTupleValue(innerTuple, i, entry, elementValidator);
+             i++;
+         }
+         tuple.set(position, innerTuple);
+     }
+ 
+     /** set the values of set/list at and after the position of the tuple */
+     private void setMapTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+     {
+         AbstractType<?> keyValidator = ((MapType<?, ?>) validator).keys;
+         AbstractType<?> valueValidator = ((MapType<?, ?>) validator).values;
+         
+         int i = 0;
+         Tuple innerTuple = TupleFactory.getInstance().newTuple(((Map<?,?>) value).size());
+         for(Map.Entry<?,?> entry :  ((Map<Object, Object>)value).entrySet())
+         {
+             Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
+             setTupleValue(mapEntryTuple, 0, entry.getKey(), keyValidator);
+             setTupleValue(mapEntryTuple, 1, entry.getValue(), valueValidator);
+             innerTuple.set(i, mapEntryTuple);
+             i++;
+         }
+         tuple.set(position, innerTuple);
+     }
+ 
+     /** convert a cql column to an object */
 -    private Object cqlColumnToObj(IColumn col, CfDef cfDef) throws IOException
++    private Object cqlColumnToObj(Column col, CfDef cfDef) throws IOException
+     {
+         // standard
+         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+         if (validators.get(col.name()) == null)
+         {
+             Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+             return marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value());
+         }
+         else
+             return validators.get(col.name()).compose(col.value());
+     }
+ 
      /** set read configuration settings */
      public void setLocation(String location, Job job) throws IOException
      {
@@@ -455,8 -524,10 +525,8 @@@
      private String partitionFilterToWhereClauseString(Expression expression)
      {
          Expression.BinaryExpression be = (Expression.BinaryExpression) expression;
 -        String name = be.getLhs().toString();
 -        String value = be.getRhs().toString();
          OpType op = expression.getOpType();
-         String opString = op.name();
+         String opString = op.toString();
          switch (op)
          {
              case OP_EQ:


[5/6] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0

Posted by br...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0

Conflicts:
	src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/678aa37a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/678aa37a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/678aa37a

Branch: refs/heads/cassandra-2.0
Commit: 678aa37af3322e805a3a639890e18391ce22426f
Parents: 1ff0d8e 70297f9
Author: Brandon Williams <br...@apache.org>
Authored: Tue Sep 10 13:44:47 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Sep 10 13:44:47 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../hadoop/pig/AbstractCassandraStorage.java    | 78 +++++++++++++++-----
 .../apache/cassandra/hadoop/pig/CqlStorage.java | 78 +++++++++++++++++++-
 3 files changed, 135 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/678aa37a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 22fa74b,4d5b446..f9a3b80
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -25,54 -9,14 +25,55 @@@ Merged from 1.2
   * Fix streaming does not transfer wrapped range (CASSANDRA-5948)
   * Fix loading index summary containing empty key (CASSANDRA-5965)
   * Correctly handle limits in CompositesSearcher (CASSANDRA-5975)
+  * Pig: handle CQL collections (CASSANDRA-5867)
  
  
 -1.2.9
 +2.0.0
 + * Fix thrift validation when inserting into CQL3 tables (CASSANDRA-5138)
 + * Fix periodic memtable flushing behavior with clean memtables (CASSANDRA-5931)
 + * Fix dateOf() function for pre-2.0 timestamp columns (CASSANDRA-5928)
 + * Fix SSTable unintentionally loads BF when opened for batch (CASSANDRA-5938)
 + * Add stream session progress to JMX (CASSANDRA-4757)
 + * Fix NPE during CAS operation (CASSANDRA-5925)
 +Merged from 1.2:
   * Fix getBloomFilterDiskSpaceUsed for AlwaysPresentFilter (CASSANDRA-5900)
 - * migrate 1.1 schema_columnfamilies.key_alias column to key_aliases
 -   (CASSANDRA-5800)
 - * add --migrate option to sstableupgrade and sstablescrub (CASSANDRA-5831)
 + * Don't announce schema version until we've loaded the changes locally
 +   (CASSANDRA-5904)
 + * Fix to support off heap bloom filters size greater than 2 GB (CASSANDRA-5903)
 + * Properly handle parsing huge map and set literals (CASSANDRA-5893)
 +
 +
 +2.0.0-rc2
 + * enable vnodes by default (CASSANDRA-5869)
 + * fix CAS contention timeout (CASSANDRA-5830)
 + * fix HsHa to respect max frame size (CASSANDRA-4573)
 + * Fix (some) 2i on composite components omissions (CASSANDRA-5851)
 + * cqlsh: add DESCRIBE FULL SCHEMA variant (CASSANDRA-5880)
 +Merged from 1.2:
 + * Correctly validate sparse composite cells in scrub (CASSANDRA-5855)
 + * Add KeyCacheHitRate metric to CF metrics (CASSANDRA-5868)
 + * cqlsh: add support for multiline comments (CASSANDRA-5798)
 + * Handle CQL3 SELECT duplicate IN restrictions on clustering columns
 +   (CASSANDRA-5856)
 +
 +
 +2.0.0-rc1
 + * improve DecimalSerializer performance (CASSANDRA-5837)
 + * fix potential spurious wakeup in AsyncOneResponse (CASSANDRA-5690)
 + * fix schema-related trigger issues (CASSANDRA-5774)
 + * Better validation when accessing CQL3 table from thrift (CASSANDRA-5138)
 + * Fix assertion error during repair (CASSANDRA-5801)
 + * Fix range tombstone bug (CASSANDRA-5805)
 + * DC-local CAS (CASSANDRA-5797)
 + * Add a native_protocol_version column to the system.local table (CASSANRDA-5819)
 + * Use index_interval from cassandra.yaml when upgraded (CASSANDRA-5822)
 + * Fix buffer underflow on socket close (CASSANDRA-5792)
 +Merged from 1.2:
 + * Fix reading DeletionTime from 1.1-format sstables (CASSANDRA-5814)
 + * cqlsh: add collections support to COPY (CASSANDRA-5698)
 + * retry important messages for any IOException (CASSANDRA-5804)
 + * Allow empty IN relations in SELECT/UPDATE/DELETE statements (CASSANDRA-5626)
 + * cqlsh: fix crashing on Windows due to libedit detection (CASSANDRA-5812)
   * fix bulk-loading compressed sstables (CASSANDRA-5820)
   * (Hadoop) fix quoting in CqlPagingRecordReader and CqlRecordWriter 
     (CASSANDRA-5824)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/678aa37a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/678aa37a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 84d7a7a,a73e5a5..2b76b83
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@@ -105,11 -107,12 +106,12 @@@ public class CqlStorage extends Abstrac
                  ByteBuffer columnValue = columns.get(ByteBufferUtil.string(cdef.name.duplicate()));
                  if (columnValue != null)
                  {
 -                    IColumn column = new Column(cdef.name, columnValue);
 +                    Column column = new Column(cdef.name, columnValue);
-                     tuple.set(i, columnToTuple(column, cfDef, UTF8Type.instance));
+                     AbstractType<?> validator = getValidatorMap(cfDef).get(column.name());
+                     setTupleValue(tuple, i, cqlColumnToObj(column, cfDef), validator);
                  }
                  else
-                     tuple.set(i, TupleFactory.getInstance().newTuple());
+                     tuple.set(i, null);
                  i++;
              }
              return tuple;
@@@ -120,6 -123,74 +122,74 @@@
          }
      }
  
+     /** set the value to the position of the tuple */
+     private void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+     {
+         if (validator instanceof CollectionType)
+             setCollectionTupleValues(tuple, position, value, validator);
+         else
+            setTupleValue(tuple, position, value);
+     }
+ 
+     /** set the values of set/list at and after the position of the tuple */
+     private void setCollectionTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+     {
+         if (validator instanceof MapType)
+         {
+             setMapTupleValues(tuple, position, value, validator);
+             return;
+         }
+         AbstractType<?> elementValidator;
+         if (validator instanceof SetType)
+             elementValidator = ((SetType<?>) validator).elements;
+         else if (validator instanceof ListType)
+             elementValidator = ((ListType<?>) validator).elements;
+         else 
+             return;
+         
+         int i = 0;
+         Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
+         for (Object entry : (Collection<?>) value)
+         {
+             setTupleValue(innerTuple, i, entry, elementValidator);
+             i++;
+         }
+         tuple.set(position, innerTuple);
+     }
+ 
+     /** set the values of set/list at and after the position of the tuple */
+     private void setMapTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+     {
+         AbstractType<?> keyValidator = ((MapType<?, ?>) validator).keys;
+         AbstractType<?> valueValidator = ((MapType<?, ?>) validator).values;
+         
+         int i = 0;
+         Tuple innerTuple = TupleFactory.getInstance().newTuple(((Map<?,?>) value).size());
+         for(Map.Entry<?,?> entry :  ((Map<Object, Object>)value).entrySet())
+         {
+             Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
+             setTupleValue(mapEntryTuple, 0, entry.getKey(), keyValidator);
+             setTupleValue(mapEntryTuple, 1, entry.getValue(), valueValidator);
+             innerTuple.set(i, mapEntryTuple);
+             i++;
+         }
+         tuple.set(position, innerTuple);
+     }
+ 
+     /** convert a cql column to an object */
 -    private Object cqlColumnToObj(IColumn col, CfDef cfDef) throws IOException
++    private Object cqlColumnToObj(Column col, CfDef cfDef) throws IOException
+     {
+         // standard
+         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+         if (validators.get(col.name()) == null)
+         {
+             Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+             return marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value());
+         }
+         else
+             return validators.get(col.name()).compose(col.value());
+     }
+ 
      /** set read configuration settings */
      public void setLocation(String location, Job job) throws IOException
      {
@@@ -455,8 -524,10 +525,8 @@@
      private String partitionFilterToWhereClauseString(Expression expression)
      {
          Expression.BinaryExpression be = (Expression.BinaryExpression) expression;
 -        String name = be.getLhs().toString();
 -        String value = be.getRhs().toString();
          OpType op = expression.getOpType();
-         String opString = op.name();
+         String opString = op.toString();
          switch (op)
          {
              case OP_EQ:


[6/6] git commit: Merge branch 'cassandra-2.0' into trunk

Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7246502e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7246502e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7246502e

Branch: refs/heads/trunk
Commit: 7246502e7f6cd1e98b5199412541b621d7f12ffa
Parents: 3822612 678aa37
Author: Brandon Williams <br...@apache.org>
Authored: Tue Sep 10 13:45:13 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Sep 10 13:45:13 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../hadoop/pig/AbstractCassandraStorage.java    | 78 +++++++++++++++-----
 .../apache/cassandra/hadoop/pig/CqlStorage.java | 78 +++++++++++++++++++-
 3 files changed, 135 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7246502e/CHANGES.txt
----------------------------------------------------------------------