You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2013/10/12 00:43:23 UTC

[1/5] git commit: Fix int/bigint in CassandraStorage Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-6102

Updated Branches:
  refs/heads/cassandra-2.0 a3ad2e822 -> e5dba3c62
  refs/heads/trunk bc8e2475f -> b89cce9c8


Fix int/bigint in CassandraStorage
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-6102


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/639c01a3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/639c01a3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/639c01a3

Branch: refs/heads/trunk
Commit: 639c01a3504ba2e2a55061093651a9973ad68d11
Parents: eee485e
Author: Brandon Williams <br...@apache.org>
Authored: Fri Oct 11 15:22:35 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Oct 11 15:22:35 2013 -0500

----------------------------------------------------------------------
 .../hadoop/pig/AbstractCassandraStorage.java    | 82 +++++++++++++-------
 .../cassandra/hadoop/pig/CassandraStorage.java  | 45 ++++++-----
 .../apache/cassandra/hadoop/pig/CqlStorage.java |  8 +-
 3 files changed, 85 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/639c01a3/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 6ad4f9e..dbebfb5 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -97,7 +97,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
     protected String outputFormatClass;
     protected int splitSize = 64 * 1024;
     protected String partitionerClass;
-    protected boolean usePartitionFilter = false; 
+    protected boolean usePartitionFilter = false;
 
     public AbstractCassandraStorage()
     {
@@ -116,8 +116,9 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
     }
 
     /** convert a column to a tuple */
-    protected Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException
+    protected Tuple columnToTuple(IColumn col, CfInfo cfInfo, AbstractType comparator) throws IOException
     {
+        CfDef cfDef = cfInfo.cfDef;
         Tuple pair = TupleFactory.getInstance().newTuple(2);
 
         // name
@@ -131,13 +132,21 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
         {
             // standard
             Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
-            if (validators.get(col.name()) == null)
+            ByteBuffer colName;
+            if (cfInfo.cql3Table && !cfInfo.compactCqlTable)
+            {
+                ByteBuffer[] names = ((AbstractCompositeType) parseType(cfDef.comparator_type)).split(col.name());
+                colName = names[names.length-1];
+            }
+            else
+                colName = col.name();
+            if (validators.get(colName) == null)
             {
                 Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
                 setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value()));
             }
             else
-                setTupleValue(pair, 1, cassandraToObj(validators.get(col.name()), col.value()));
+                setTupleValue(pair, 1, cassandraToObj(validators.get(colName), col.value()));
             return pair;
         }
         else
@@ -145,7 +154,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             // super
             ArrayList<Tuple> subcols = new ArrayList<Tuple>();
             for (IColumn subcol : col.getSubColumns())
-                subcols.add(columnToTuple(subcol, cfDef, parseType(cfDef.getSubcomparator_type())));
+                subcols.add(columnToTuple(subcol, cfInfo, parseType(cfDef.getSubcomparator_type())));
 
             pair.set(1, new DefaultDataBag(subcols));
         }
@@ -168,11 +177,16 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
     }
 
     /** get the columnfamily definition for the signature */
-    protected CfDef getCfDef(String signature) throws IOException
+    protected CfInfo getCfInfo(String signature) throws IOException
     {
         UDFContext context = UDFContext.getUDFContext();
         Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
-        return cfdefFromString(property.getProperty(signature));
+        String prop = property.getProperty(signature);
+        CfInfo cfInfo = new CfInfo();
+        cfInfo.cfDef = cfdefFromString(prop.substring(2));
+        cfInfo.compactCqlTable = prop.charAt(0) == '1' ? true : false;
+        cfInfo.cql3Table = prop.charAt(1) == '1' ? true : false;
+        return cfInfo;
     }
 
     /** construct a map to store the mashaller type to cassandra data type mapping */
@@ -329,10 +343,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             return DataType.INTEGER;
         else if (type instanceof AsciiType || 
                 type instanceof UTF8Type ||
-                type instanceof DecimalType ||
-                type instanceof InetAddressType ||
-                type instanceof LexicalUUIDType ||
-                type instanceof UUIDType )
+                type instanceof DecimalType || type instanceof InetAddressType )
             return DataType.CHARARRAY;
         else if (type instanceof FloatType)
             return DataType.FLOAT;
@@ -513,11 +524,15 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
                     }
                 }
 
-                // compose the CfDef for the columfamily
-                CfDef cfDef = getCfDef(client);
+                // compose the CfInfo for the columfamily
+                CfInfo cfInfo = getCfInfo(client);
 
-                if (cfDef != null)
-                    properties.setProperty(signature, cfdefToString(cfDef));
+                if (cfInfo.cfDef != null)
+                {
+                    StringBuilder sb = new StringBuilder();
+                    sb.append(cfInfo.compactCqlTable ? 1 : 0).append(cfInfo.cql3Table ? 1: 0).append(cfdefToString(cfInfo.cfDef));
+                    properties.setProperty(signature, sb.toString());
+                }
                 else
                     throw new IOException(String.format("Column family '%s' not found in keyspace '%s'",
                                                              column_family,
@@ -563,17 +578,17 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
         return cfDef;
     }
 
-    /** return the CfDef for the column family */
-    protected CfDef getCfDef(Cassandra.Client client)
+    /** return the CfInf for the column family */
+    protected CfInfo getCfInfo(Cassandra.Client client)
             throws InvalidRequestException,
                    UnavailableException,
                    TimedOutException,
                    SchemaDisagreementException,
                    TException,
-                   CharacterCodingException,
                    NotFoundException,
                    org.apache.cassandra.exceptions.InvalidRequestException,
-                   ConfigurationException
+                   ConfigurationException,
+                   IOException
     {
         // get CF meta data
         String query = "SELECT type," +
@@ -627,12 +642,19 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             else
                 cql3Table = true;
         }
-        cfDef.column_metadata = getColumnMetadata(client, cql3Table);
-        return cfDef;
+        cfDef.column_metadata = getColumnMetadata(client);
+        CfInfo cfInfo = new CfInfo();
+        cfInfo.cfDef = cfDef;
+        if (cql3Table && !(parseType(cfDef.comparator_type) instanceof AbstractCompositeType))
+            cfInfo.compactCqlTable = true;
+
+        if (cql3Table)
+            cfInfo.cql3Table = true;
+        return cfInfo;
     }
 
     /** get a list of columns */
-    protected abstract List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
+    protected abstract List<ColumnDef> getColumnMetadata(Cassandra.Client client)
             throws InvalidRequestException,
             UnavailableException,
             TimedOutException,
@@ -749,7 +771,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
     /** get a list of columns with defined index*/
     protected List<ColumnDef> getIndexes() throws IOException
     {
-        CfDef cfdef = getCfDef(loadSignature);
+        CfDef cfdef = getCfInfo(loadSignature).cfDef;
         List<ColumnDef> indexes = new ArrayList<ColumnDef>();
         for (ColumnDef cdef : cfdef.column_metadata)
         {
@@ -778,15 +800,17 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
 
     protected Object cassandraToObj(AbstractType validator, ByteBuffer value)
     {
-        if (validator instanceof DecimalType ||
-                validator instanceof InetAddressType ||
-                validator instanceof LexicalUUIDType ||
-                validator instanceof UUIDType)
-        {
+        if (validator instanceof DecimalType || validator instanceof InetAddressType)
             return validator.getString(value);
-        }
         else
             return validator.compose(value);
     }
+
+    protected class CfInfo
+    {
+        boolean compactCqlTable = false;
+        boolean cql3Table = false;
+        CfDef cfDef;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639c01a3/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 1135b70..a7cc1ad 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -101,7 +101,8 @@ public class CassandraStorage extends AbstractCassandraStorage
     /** read wide row*/
     public Tuple getNextWide() throws IOException
     {
-        CfDef cfDef = getCfDef(loadSignature);
+        CfInfo cfInfo = getCfInfo(loadSignature);
+        CfDef cfDef = cfInfo.cfDef;
         ByteBuffer key = null;
         Tuple tuple = null; 
         DefaultDataBag bag = new DefaultDataBag();
@@ -124,7 +125,7 @@ public class CassandraStorage extends AbstractCassandraStorage
                         }
                         for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
                         {
-                            bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                            bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
                         }
                         lastKey = null;
                         lastRow = null;
@@ -162,7 +163,7 @@ public class CassandraStorage extends AbstractCassandraStorage
                             addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
                         for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
                         {
-                            bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                            bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
                         }
                         tuple.append(bag);
                         lastKey = key;
@@ -179,14 +180,14 @@ public class CassandraStorage extends AbstractCassandraStorage
                 {
                     for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
                     {
-                        bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                        bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
                     }
                     lastKey = null;
                     lastRow = null;
                 }
                 for (Map.Entry<ByteBuffer, IColumn> entry : row.entrySet())
                 {
-                    bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                    bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
                 }
             }
         }
@@ -208,7 +209,8 @@ public class CassandraStorage extends AbstractCassandraStorage
             if (!reader.nextKeyValue())
                 return null;
 
-            CfDef cfDef = getCfDef(loadSignature);
+            CfInfo cfInfo = getCfInfo(loadSignature);
+            CfDef cfDef = cfInfo.cfDef;
             ByteBuffer key = reader.getCurrentKey();
             Map<ByteBuffer, IColumn> cf = reader.getCurrentValue();
             assert key != null && cf != null;
@@ -224,11 +226,21 @@ public class CassandraStorage extends AbstractCassandraStorage
             // take care to iterate these in the same order as the schema does
             for (ColumnDef cdef : cfDef.column_metadata)
             {
-                if (cf.containsKey(cdef.name))
+                boolean hasColumn = false;
+                boolean cql3Table = false;
+                try
                 {
-                    tuple.append(columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type())));
+                    hasColumn = cf.containsKey(cdef.name);
                 }
-                else
+                catch (Exception e)
+                {
+                    cql3Table = true;                  
+                }
+                if (hasColumn)
+                {
+                    tuple.append(columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type())));
+                }
+                else if (!cql3Table)
                 {   // otherwise, we need to add an empty tuple to take its place
                     tuple.append(TupleFactory.getInstance().newTuple());
                 }
@@ -238,7 +250,7 @@ public class CassandraStorage extends AbstractCassandraStorage
             for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
             {
                 if (!added.containsKey(entry.getKey()))
-                    bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                    bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
             }
             tuple.append(bag);
             // finally, special top-level indexes if needed
@@ -246,7 +258,7 @@ public class CassandraStorage extends AbstractCassandraStorage
             {
                 for (ColumnDef cdef : getIndexes())
                 {
-                    Tuple throwaway = columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type()));
+                    Tuple throwaway = columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type()));
                     tuple.append(throwaway.get(1));
                 }
             }
@@ -356,7 +368,8 @@ public class CassandraStorage extends AbstractCassandraStorage
     public ResourceSchema getSchema(String location, Job job) throws IOException
     {
         setLocation(location, job);
-        CfDef cfDef = getCfDef(loadSignature);
+        CfInfo cfInfo = getCfInfo(loadSignature);
+        CfDef cfDef = cfInfo.cfDef;
 
         if (cfDef.column_type.equals("Super"))
             return null;
@@ -403,7 +416,7 @@ public class CassandraStorage extends AbstractCassandraStorage
         // add the key first, then the indexed columns, and finally the bag
         allSchemaFields.add(keyFieldSchema);
 
-        if (!widerows)
+        if (!widerows && (cfInfo.compactCqlTable || !cfInfo.cql3Table))
         {
             // defined validators/indexes
             for (ColumnDef cdef : cfDef.column_metadata)
@@ -697,7 +710,7 @@ public class CassandraStorage extends AbstractCassandraStorage
     }
 
     /** get a list of column for the column family */
-    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table) 
+    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client) 
             throws InvalidRequestException, 
             UnavailableException, 
             TimedOutException, 
@@ -708,9 +721,6 @@ public class CassandraStorage extends AbstractCassandraStorage
             ConfigurationException,
             NotFoundException
     {
-        if (cql3Table)
-            return new ArrayList<ColumnDef>();
-        
         return getColumnMeta(client, true, true);
     }
 
@@ -795,6 +805,5 @@ public class CassandraStorage extends AbstractCassandraStorage
             		                        "[&comparator=<comparator>][&split_size=<size>][&partitioner=<partitioner>]]': " + e.getMessage());
         }
     }
-
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639c01a3/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index e51338c..7e1f56c 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -95,7 +95,8 @@ public class CqlStorage extends AbstractCassandraStorage
             if (!reader.nextKeyValue())
                 return null;
 
-            CfDef cfDef = getCfDef(loadSignature);
+            CfInfo cfInfo = getCfInfo(loadSignature);
+            CfDef cfDef = cfInfo.cfDef;
             Map<String, ByteBuffer> keys = reader.getCurrentKey();
             Map<String, ByteBuffer> columns = reader.getCurrentValue();
             assert keys != null && columns != null;
@@ -281,7 +282,8 @@ public class CqlStorage extends AbstractCassandraStorage
     public ResourceSchema getSchema(String location, Job job) throws IOException
     {
         setLocation(location, job);
-        CfDef cfDef = getCfDef(loadSignature);
+        CfInfo cfInfo = getCfInfo(loadSignature);
+        CfDef cfDef = cfInfo.cfDef;
 
         // top-level schema, no type
         ResourceSchema schema = new ResourceSchema();
@@ -429,7 +431,7 @@ public class CqlStorage extends AbstractCassandraStorage
     }
     
     /** include key columns */
-    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
+    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
             throws InvalidRequestException,
             UnavailableException,
             TimedOutException,


[5/5] git commit: Merge branch 'cassandra-2.0' into trunk

Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b89cce9c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b89cce9c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b89cce9c

Branch: refs/heads/trunk
Commit: b89cce9c8583d869cb3ca6fd26b4a9b47d71f108
Parents: bc8e247 e5dba3c
Author: Brandon Williams <br...@apache.org>
Authored: Fri Oct 11 17:41:45 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Oct 11 17:41:45 2013 -0500

----------------------------------------------------------------------

----------------------------------------------------------------------



[4/5] git commit: Revert "Merge branch 'cassandra-1.2' into cassandra-2.0"

Posted by br...@apache.org.
Revert "Merge branch 'cassandra-1.2' into cassandra-2.0"

This reverts commit a3ad2e82249b88d4a05f24140948cdbc809d14f3, reversing
changes made to 8a506e66a66c004a7a253e3dd28845517da8a967.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e5dba3c6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e5dba3c6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e5dba3c6

Branch: refs/heads/cassandra-2.0
Commit: e5dba3c6250eb7285777490a937b66a7092afa77
Parents: a3ad2e8
Author: Brandon Williams <br...@apache.org>
Authored: Fri Oct 11 17:41:22 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Oct 11 17:41:22 2013 -0500

----------------------------------------------------------------------
 .../hadoop/pig/AbstractCassandraStorage.java    | 97 ++++++--------------
 .../cassandra/hadoop/pig/CassandraStorage.java  | 55 ++++-------
 .../apache/cassandra/hadoop/pig/CqlStorage.java |  8 +-
 3 files changed, 51 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5dba3c6/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 486c781..c881734 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -97,7 +97,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
     protected String outputFormatClass;
     protected int splitSize = 64 * 1024;
     protected String partitionerClass;
-    protected boolean usePartitionFilter = false;
+    protected boolean usePartitionFilter = false; 
 
     public AbstractCassandraStorage()
     {
@@ -116,9 +116,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
     }
 
     /** convert a column to a tuple */
-    protected Tuple columnToTuple(IColumn col, CfInfo cfInfo, AbstractType comparator) throws IOException
+    protected Tuple columnToTuple(Column col, CfDef cfDef, AbstractType comparator) throws IOException
     {
-        CfDef cfDef = cfInfo.cfDef;
         Tuple pair = TupleFactory.getInstance().newTuple(2);
 
         // name
@@ -131,34 +130,11 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
         if (validators.get(col.name()) == null)
         {
-            // standard
-            Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
-            ByteBuffer colName;
-            if (cfInfo.cql3Table && !cfInfo.compactCqlTable)
-            {
-                ByteBuffer[] names = ((AbstractCompositeType) parseType(cfDef.comparator_type)).split(col.name());
-                colName = names[names.length-1];
-            }
-            else
-                colName = col.name();
-            if (validators.get(colName) == null)
-            {
-                Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
-                setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value()));
-            }
-            else
-                setTupleValue(pair, 1, cassandraToObj(validators.get(colName), col.value()));
-            return pair;
+            Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+            setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value()));
         }
         else
-        {
-            // super
-            ArrayList<Tuple> subcols = new ArrayList<Tuple>();
-            for (IColumn subcol : col.getSubColumns())
-                subcols.add(columnToTuple(subcol, cfInfo, parseType(cfDef.getSubcomparator_type())));
-
-            pair.set(1, new DefaultDataBag(subcols));
-        }
+            setTupleValue(pair, 1, cassandraToObj(validators.get(col.name()), col.value()));
         return pair;
     }
 
@@ -178,16 +154,11 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
     }
 
     /** get the columnfamily definition for the signature */
-    protected CfInfo getCfInfo(String signature) throws IOException
+    protected CfDef getCfDef(String signature) throws IOException
     {
         UDFContext context = UDFContext.getUDFContext();
         Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
-        String prop = property.getProperty(signature);
-        CfInfo cfInfo = new CfInfo();
-        cfInfo.cfDef = cfdefFromString(prop.substring(2));
-        cfInfo.compactCqlTable = prop.charAt(0) == '1' ? true : false;
-        cfInfo.cql3Table = prop.charAt(1) == '1' ? true : false;
-        return cfInfo;
+        return cfdefFromString(property.getProperty(signature));
     }
 
     /** construct a map to store the mashaller type to cassandra data type mapping */
@@ -344,7 +315,10 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             return DataType.INTEGER;
         else if (type instanceof AsciiType || 
                 type instanceof UTF8Type ||
-                type instanceof DecimalType || type instanceof InetAddressType )
+                type instanceof DecimalType ||
+                type instanceof InetAddressType ||
+                type instanceof LexicalUUIDType ||
+                type instanceof UUIDType )
             return DataType.CHARARRAY;
         else if (type instanceof FloatType)
             return DataType.FLOAT;
@@ -525,15 +499,11 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
                     }
                 }
 
-                // compose the CfInfo for the columfamily
-                CfInfo cfInfo = getCfInfo(client);
+                // compose the CfDef for the columfamily
+                CfDef cfDef = getCfDef(client);
 
-                if (cfInfo.cfDef != null)
-                {
-                    StringBuilder sb = new StringBuilder();
-                    sb.append(cfInfo.compactCqlTable ? 1 : 0).append(cfInfo.cql3Table ? 1: 0).append(cfdefToString(cfInfo.cfDef));
-                    properties.setProperty(signature, sb.toString());
-                }
+                if (cfDef != null)
+                    properties.setProperty(signature, cfdefToString(cfDef));
                 else
                     throw new IOException(String.format("Column family '%s' not found in keyspace '%s'",
                                                              column_family,
@@ -579,17 +549,17 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
         return cfDef;
     }
 
-    /** return the CfInf for the column family */
-    protected CfInfo getCfInfo(Cassandra.Client client)
+    /** return the CfDef for the column family */
+    protected CfDef getCfDef(Cassandra.Client client)
             throws InvalidRequestException,
                    UnavailableException,
                    TimedOutException,
                    SchemaDisagreementException,
                    TException,
+                   CharacterCodingException,
                    NotFoundException,
                    org.apache.cassandra.exceptions.InvalidRequestException,
-                   ConfigurationException,
-                   IOException
+                   ConfigurationException
     {
         // get CF meta data
         String query = "SELECT type," +
@@ -643,19 +613,12 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             else
                 cql3Table = true;
         }
-        cfDef.column_metadata = getColumnMetadata(client);
-        CfInfo cfInfo = new CfInfo();
-        cfInfo.cfDef = cfDef;
-        if (cql3Table && !(parseType(cfDef.comparator_type) instanceof AbstractCompositeType))
-            cfInfo.compactCqlTable = true;
-
-        if (cql3Table)
-            cfInfo.cql3Table = true;
-        return cfInfo;
+        cfDef.column_metadata = getColumnMetadata(client, cql3Table);
+        return cfDef;
     }
 
     /** get a list of columns */
-    protected abstract List<ColumnDef> getColumnMetadata(Cassandra.Client client)
+    protected abstract List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
             throws InvalidRequestException,
             UnavailableException,
             TimedOutException,
@@ -772,7 +735,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
     /** get a list of columns with defined index*/
     protected List<ColumnDef> getIndexes() throws IOException
     {
-        CfDef cfdef = getCfInfo(loadSignature).cfDef;
+        CfDef cfdef = getCfDef(loadSignature);
         List<ColumnDef> indexes = new ArrayList<ColumnDef>();
         for (ColumnDef cdef : cfdef.column_metadata)
         {
@@ -801,17 +764,15 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
 
     protected Object cassandraToObj(AbstractType validator, ByteBuffer value)
     {
-        if (validator instanceof DecimalType || validator instanceof InetAddressType)
+        if (validator instanceof DecimalType ||
+                validator instanceof InetAddressType ||
+                validator instanceof LexicalUUIDType ||
+                validator instanceof UUIDType)
+        {
             return validator.getString(value);
+        }
         else
             return validator.compose(value);
     }
-
-    protected class CfInfo
-    {
-        boolean compactCqlTable = false;
-        boolean cql3Table = false;
-        CfDef cfDef;
-    }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5dba3c6/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index d9c55a1..4083236 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -103,8 +103,7 @@ public class CassandraStorage extends AbstractCassandraStorage
     /** read wide row*/
     public Tuple getNextWide() throws IOException
     {
-        CfInfo cfInfo = getCfInfo(loadSignature);
-        CfDef cfDef = cfInfo.cfDef;
+        CfDef cfDef = getCfDef(loadSignature);
         ByteBuffer key = null;
         Tuple tuple = null; 
         DefaultDataBag bag = new DefaultDataBag();
@@ -127,7 +126,7 @@ public class CassandraStorage extends AbstractCassandraStorage
                         }
                         for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
                         {
-                            bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+                            bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                         }
                         lastKey = null;
                         lastRow = null;
@@ -165,7 +164,7 @@ public class CassandraStorage extends AbstractCassandraStorage
                             addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
                         for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
                         {
-                            bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+                            bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                         }
                         tuple.append(bag);
                         lastKey = key;
@@ -182,14 +181,14 @@ public class CassandraStorage extends AbstractCassandraStorage
                 {
                     for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
                     {
-                        bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+                        bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                     }
                     lastKey = null;
                     lastRow = null;
                 }
                 for (Map.Entry<ByteBuffer, Column> entry : row.entrySet())
                 {
-                    bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+                    bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                 }
             }
         }
@@ -211,8 +210,7 @@ public class CassandraStorage extends AbstractCassandraStorage
             if (!reader.nextKeyValue())
                 return null;
 
-            CfInfo cfInfo = getCfInfo(loadSignature);
-            CfDef cfDef = cfInfo.cfDef;
+            CfDef cfDef = getCfDef(loadSignature);
             ByteBuffer key = reader.getCurrentKey();
             Map<ByteBuffer, Column> cf = reader.getCurrentValue();
             assert key != null && cf != null;
@@ -228,21 +226,11 @@ public class CassandraStorage extends AbstractCassandraStorage
             // take care to iterate these in the same order as the schema does
             for (ColumnDef cdef : cfDef.column_metadata)
             {
-                boolean hasColumn = false;
-                boolean cql3Table = false;
-                try
+                if (cf.containsKey(cdef.name))
                 {
-                    hasColumn = cf.containsKey(cdef.name);
+                    tuple.append(columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type())));
                 }
-                catch (Exception e)
-                {
-                    cql3Table = true;                  
-                }
-                if (hasColumn)
-                {
-                    tuple.append(columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type())));
-                }
-                else if (!cql3Table)
+                else
                 {   // otherwise, we need to add an empty tuple to take its place
                     tuple.append(TupleFactory.getInstance().newTuple());
                 }
@@ -252,7 +240,7 @@ public class CassandraStorage extends AbstractCassandraStorage
             for (Map.Entry<ByteBuffer, Column> entry : cf.entrySet())
             {
                 if (!added.containsKey(entry.getKey()))
-                    bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+                    bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
             }
             tuple.append(bag);
             // finally, special top-level indexes if needed
@@ -260,7 +248,7 @@ public class CassandraStorage extends AbstractCassandraStorage
             {
                 for (ColumnDef cdef : getIndexes())
                 {
-                    Tuple throwaway = columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type()));
+                    Tuple throwaway = columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type()));
                     tuple.append(throwaway.get(1));
                 }
             }
@@ -370,8 +358,7 @@ public class CassandraStorage extends AbstractCassandraStorage
     public ResourceSchema getSchema(String location, Job job) throws IOException
     {
         setLocation(location, job);
-        CfInfo cfInfo = getCfInfo(loadSignature);
-        CfDef cfDef = cfInfo.cfDef;
+        CfDef cfDef = getCfDef(loadSignature);
 
         if (cfDef.column_type.equals("Super"))
             return null;
@@ -418,7 +405,7 @@ public class CassandraStorage extends AbstractCassandraStorage
         // add the key first, then the indexed columns, and finally the bag
         allSchemaFields.add(keyFieldSchema);
 
-        if (!widerows && (cfInfo.compactCqlTable || !cfInfo.cql3Table))
+        if (!widerows)
         {
             // defined validators/indexes
             for (ColumnDef cdef : cfDef.column_metadata)
@@ -712,17 +699,12 @@ public class CassandraStorage extends AbstractCassandraStorage
     }
 
     /** get a list of column for the column family */
-    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client) 
-            throws InvalidRequestException, 
-            UnavailableException, 
-            TimedOutException, 
-            SchemaDisagreementException, 
-            TException,
-            CharacterCodingException,
-            org.apache.cassandra.exceptions.InvalidRequestException,
-            ConfigurationException,
-            NotFoundException
+    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table) 
+    throws TException, CharacterCodingException, InvalidRequestException, ConfigurationException
     {
+        if (cql3Table)
+            return new ArrayList<>();
+        
         return getColumnMeta(client, true, true);
     }
 
@@ -807,5 +789,6 @@ public class CassandraStorage extends AbstractCassandraStorage
             		                        "[&comparator=<comparator>][&split_size=<size>][&partitioner=<partitioner>]]': " + e.getMessage());
         }
     }
+
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5dba3c6/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 9a9b522..50ee6b7 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -94,8 +94,7 @@ public class CqlStorage extends AbstractCassandraStorage
             if (!reader.nextKeyValue())
                 return null;
 
-            CfInfo cfInfo = getCfInfo(loadSignature);
-            CfDef cfDef = cfInfo.cfDef;
+            CfDef cfDef = getCfDef(loadSignature);
             Map<String, ByteBuffer> keys = reader.getCurrentKey();
             Map<String, ByteBuffer> columns = reader.getCurrentValue();
             assert keys != null && columns != null;
@@ -281,8 +280,7 @@ public class CqlStorage extends AbstractCassandraStorage
     public ResourceSchema getSchema(String location, Job job) throws IOException
     {
         setLocation(location, job);
-        CfInfo cfInfo = getCfInfo(loadSignature);
-        CfDef cfDef = cfInfo.cfDef;
+        CfDef cfDef = getCfDef(loadSignature);
 
         // top-level schema, no type
         ResourceSchema schema = new ResourceSchema();
@@ -432,7 +430,7 @@ public class CqlStorage extends AbstractCassandraStorage
     }
     
     /** include key columns */
-    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
+    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
             throws InvalidRequestException,
             UnavailableException,
             TimedOutException,


[2/5] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0

Posted by br...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a3ad2e82
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a3ad2e82
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a3ad2e82

Branch: refs/heads/trunk
Commit: a3ad2e82249b88d4a05f24140948cdbc809d14f3
Parents: 8a506e6 639c01a
Author: Brandon Williams <br...@apache.org>
Authored: Fri Oct 11 15:30:27 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Oct 11 15:30:27 2013 -0500

----------------------------------------------------------------------
 .../hadoop/pig/AbstractCassandraStorage.java    | 97 ++++++++++++++------
 .../cassandra/hadoop/pig/CassandraStorage.java  | 55 +++++++----
 .../apache/cassandra/hadoop/pig/CqlStorage.java |  8 +-
 3 files changed, 109 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3ad2e82/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index c881734,dbebfb5..486c781
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@@ -127,14 -128,36 +128,37 @@@ public abstract class AbstractCassandra
              setTupleValue(pair, 0, cassandraToObj(comparator, col.name()));
  
          // value
 -        if (col instanceof Column)
 +        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
 +        if (validators.get(col.name()) == null)
          {
-             Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
-             setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value()));
+             // standard
+             Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+             ByteBuffer colName;
+             if (cfInfo.cql3Table && !cfInfo.compactCqlTable)
+             {
+                 ByteBuffer[] names = ((AbstractCompositeType) parseType(cfDef.comparator_type)).split(col.name());
+                 colName = names[names.length-1];
+             }
+             else
+                 colName = col.name();
+             if (validators.get(colName) == null)
+             {
+                 Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+                 setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value()));
+             }
+             else
+                 setTupleValue(pair, 1, cassandraToObj(validators.get(colName), col.value()));
+             return pair;
          }
          else
-             setTupleValue(pair, 1, cassandraToObj(validators.get(col.name()), col.value()));
+         {
+             // super
+             ArrayList<Tuple> subcols = new ArrayList<Tuple>();
+             for (IColumn subcol : col.getSubColumns())
+                 subcols.add(columnToTuple(subcol, cfInfo, parseType(cfDef.getSubcomparator_type())));
+ 
+             pair.set(1, new DefaultDataBag(subcols));
+         }
          return pair;
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3ad2e82/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 4083236,a7cc1ad..d9c55a1
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@@ -124,9 -123,9 +125,9 @@@ public class CassandraStorage extends A
                              key = (ByteBuffer)reader.getCurrentKey();
                              tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
                          }
 -                        for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
 +                        for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
                          {
-                             bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                             bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
                          }
                          lastKey = null;
                          lastRow = null;
@@@ -162,9 -161,9 +163,9 @@@
                              tuple = keyToTuple(lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
                          else
                              addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
 -                        for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
 +                        for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
                          {
-                             bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                             bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
                          }
                          tuple.append(bag);
                          lastKey = key;
@@@ -176,19 -175,19 +177,19 @@@
                      else
                          addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
                  }
 -                SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
 +                SortedMap<ByteBuffer, Column> row = (SortedMap<ByteBuffer, Column>)reader.getCurrentValue();
                  if (lastRow != null) // prepend what was read last time
                  {
 -                    for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
 +                    for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
                      {
-                         bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                         bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
                      }
                      lastKey = null;
                      lastRow = null;
                  }
 -                for (Map.Entry<ByteBuffer, IColumn> entry : row.entrySet())
 +                for (Map.Entry<ByteBuffer, Column> entry : row.entrySet())
                  {
-                     bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                     bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
                  }
              }
          }
@@@ -210,9 -209,10 +211,10 @@@
              if (!reader.nextKeyValue())
                  return null;
  
-             CfDef cfDef = getCfDef(loadSignature);
+             CfInfo cfInfo = getCfInfo(loadSignature);
+             CfDef cfDef = cfInfo.cfDef;
              ByteBuffer key = reader.getCurrentKey();
 -            Map<ByteBuffer, IColumn> cf = reader.getCurrentValue();
 +            Map<ByteBuffer, Column> cf = reader.getCurrentValue();
              assert key != null && cf != null;
  
              // output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest
@@@ -237,10 -247,10 +249,10 @@@
                  added.put(cdef.name, true);
              }
              // now add all the other columns
 -            for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
 +            for (Map.Entry<ByteBuffer, Column> entry : cf.entrySet())
              {
                  if (!added.containsKey(entry.getKey()))
-                     bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                     bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
              }
              tuple.append(bag);
              // finally, special top-level indexes if needed

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3ad2e82/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------


[3/5] git commit: Revert "Merge branch 'cassandra-1.2' into cassandra-2.0"

Posted by br...@apache.org.
Revert "Merge branch 'cassandra-1.2' into cassandra-2.0"

This reverts commit a3ad2e82249b88d4a05f24140948cdbc809d14f3, reversing
changes made to 8a506e66a66c004a7a253e3dd28845517da8a967.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e5dba3c6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e5dba3c6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e5dba3c6

Branch: refs/heads/trunk
Commit: e5dba3c6250eb7285777490a937b66a7092afa77
Parents: a3ad2e8
Author: Brandon Williams <br...@apache.org>
Authored: Fri Oct 11 17:41:22 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Oct 11 17:41:22 2013 -0500

----------------------------------------------------------------------
 .../hadoop/pig/AbstractCassandraStorage.java    | 97 ++++++--------------
 .../cassandra/hadoop/pig/CassandraStorage.java  | 55 ++++-------
 .../apache/cassandra/hadoop/pig/CqlStorage.java |  8 +-
 3 files changed, 51 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5dba3c6/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 486c781..c881734 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -97,7 +97,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
     protected String outputFormatClass;
     protected int splitSize = 64 * 1024;
     protected String partitionerClass;
-    protected boolean usePartitionFilter = false;
+    protected boolean usePartitionFilter = false; 
 
     public AbstractCassandraStorage()
     {
@@ -116,9 +116,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
     }
 
     /** convert a column to a tuple */
-    protected Tuple columnToTuple(IColumn col, CfInfo cfInfo, AbstractType comparator) throws IOException
+    protected Tuple columnToTuple(Column col, CfDef cfDef, AbstractType comparator) throws IOException
     {
-        CfDef cfDef = cfInfo.cfDef;
         Tuple pair = TupleFactory.getInstance().newTuple(2);
 
         // name
@@ -131,34 +130,11 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
         if (validators.get(col.name()) == null)
         {
-            // standard
-            Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
-            ByteBuffer colName;
-            if (cfInfo.cql3Table && !cfInfo.compactCqlTable)
-            {
-                ByteBuffer[] names = ((AbstractCompositeType) parseType(cfDef.comparator_type)).split(col.name());
-                colName = names[names.length-1];
-            }
-            else
-                colName = col.name();
-            if (validators.get(colName) == null)
-            {
-                Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
-                setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value()));
-            }
-            else
-                setTupleValue(pair, 1, cassandraToObj(validators.get(colName), col.value()));
-            return pair;
+            Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+            setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value()));
         }
         else
-        {
-            // super
-            ArrayList<Tuple> subcols = new ArrayList<Tuple>();
-            for (IColumn subcol : col.getSubColumns())
-                subcols.add(columnToTuple(subcol, cfInfo, parseType(cfDef.getSubcomparator_type())));
-
-            pair.set(1, new DefaultDataBag(subcols));
-        }
+            setTupleValue(pair, 1, cassandraToObj(validators.get(col.name()), col.value()));
         return pair;
     }
 
@@ -178,16 +154,11 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
     }
 
     /** get the columnfamily definition for the signature */
-    protected CfInfo getCfInfo(String signature) throws IOException
+    protected CfDef getCfDef(String signature) throws IOException
     {
         UDFContext context = UDFContext.getUDFContext();
         Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
-        String prop = property.getProperty(signature);
-        CfInfo cfInfo = new CfInfo();
-        cfInfo.cfDef = cfdefFromString(prop.substring(2));
-        cfInfo.compactCqlTable = prop.charAt(0) == '1' ? true : false;
-        cfInfo.cql3Table = prop.charAt(1) == '1' ? true : false;
-        return cfInfo;
+        return cfdefFromString(property.getProperty(signature));
     }
 
     /** construct a map to store the mashaller type to cassandra data type mapping */
@@ -344,7 +315,10 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             return DataType.INTEGER;
         else if (type instanceof AsciiType || 
                 type instanceof UTF8Type ||
-                type instanceof DecimalType || type instanceof InetAddressType )
+                type instanceof DecimalType ||
+                type instanceof InetAddressType ||
+                type instanceof LexicalUUIDType ||
+                type instanceof UUIDType )
             return DataType.CHARARRAY;
         else if (type instanceof FloatType)
             return DataType.FLOAT;
@@ -525,15 +499,11 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
                     }
                 }
 
-                // compose the CfInfo for the columfamily
-                CfInfo cfInfo = getCfInfo(client);
+                // compose the CfDef for the columfamily
+                CfDef cfDef = getCfDef(client);
 
-                if (cfInfo.cfDef != null)
-                {
-                    StringBuilder sb = new StringBuilder();
-                    sb.append(cfInfo.compactCqlTable ? 1 : 0).append(cfInfo.cql3Table ? 1: 0).append(cfdefToString(cfInfo.cfDef));
-                    properties.setProperty(signature, sb.toString());
-                }
+                if (cfDef != null)
+                    properties.setProperty(signature, cfdefToString(cfDef));
                 else
                     throw new IOException(String.format("Column family '%s' not found in keyspace '%s'",
                                                              column_family,
@@ -579,17 +549,17 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
         return cfDef;
     }
 
-    /** return the CfInf for the column family */
-    protected CfInfo getCfInfo(Cassandra.Client client)
+    /** return the CfDef for the column family */
+    protected CfDef getCfDef(Cassandra.Client client)
             throws InvalidRequestException,
                    UnavailableException,
                    TimedOutException,
                    SchemaDisagreementException,
                    TException,
+                   CharacterCodingException,
                    NotFoundException,
                    org.apache.cassandra.exceptions.InvalidRequestException,
-                   ConfigurationException,
-                   IOException
+                   ConfigurationException
     {
         // get CF meta data
         String query = "SELECT type," +
@@ -643,19 +613,12 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             else
                 cql3Table = true;
         }
-        cfDef.column_metadata = getColumnMetadata(client);
-        CfInfo cfInfo = new CfInfo();
-        cfInfo.cfDef = cfDef;
-        if (cql3Table && !(parseType(cfDef.comparator_type) instanceof AbstractCompositeType))
-            cfInfo.compactCqlTable = true;
-
-        if (cql3Table)
-            cfInfo.cql3Table = true;
-        return cfInfo;
+        cfDef.column_metadata = getColumnMetadata(client, cql3Table);
+        return cfDef;
     }
 
     /** get a list of columns */
-    protected abstract List<ColumnDef> getColumnMetadata(Cassandra.Client client)
+    protected abstract List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
             throws InvalidRequestException,
             UnavailableException,
             TimedOutException,
@@ -772,7 +735,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
     /** get a list of columns with defined index*/
     protected List<ColumnDef> getIndexes() throws IOException
     {
-        CfDef cfdef = getCfInfo(loadSignature).cfDef;
+        CfDef cfdef = getCfDef(loadSignature);
         List<ColumnDef> indexes = new ArrayList<ColumnDef>();
         for (ColumnDef cdef : cfdef.column_metadata)
         {
@@ -801,17 +764,15 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
 
     protected Object cassandraToObj(AbstractType validator, ByteBuffer value)
     {
-        if (validator instanceof DecimalType || validator instanceof InetAddressType)
+        if (validator instanceof DecimalType ||
+                validator instanceof InetAddressType ||
+                validator instanceof LexicalUUIDType ||
+                validator instanceof UUIDType)
+        {
             return validator.getString(value);
+        }
         else
             return validator.compose(value);
     }
-
-    protected class CfInfo
-    {
-        boolean compactCqlTable = false;
-        boolean cql3Table = false;
-        CfDef cfDef;
-    }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5dba3c6/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index d9c55a1..4083236 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -103,8 +103,7 @@ public class CassandraStorage extends AbstractCassandraStorage
     /** read wide row*/
     public Tuple getNextWide() throws IOException
     {
-        CfInfo cfInfo = getCfInfo(loadSignature);
-        CfDef cfDef = cfInfo.cfDef;
+        CfDef cfDef = getCfDef(loadSignature);
         ByteBuffer key = null;
         Tuple tuple = null; 
         DefaultDataBag bag = new DefaultDataBag();
@@ -127,7 +126,7 @@ public class CassandraStorage extends AbstractCassandraStorage
                         }
                         for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
                         {
-                            bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+                            bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                         }
                         lastKey = null;
                         lastRow = null;
@@ -165,7 +164,7 @@ public class CassandraStorage extends AbstractCassandraStorage
                             addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
                         for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
                         {
-                            bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+                            bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                         }
                         tuple.append(bag);
                         lastKey = key;
@@ -182,14 +181,14 @@ public class CassandraStorage extends AbstractCassandraStorage
                 {
                     for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
                     {
-                        bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+                        bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                     }
                     lastKey = null;
                     lastRow = null;
                 }
                 for (Map.Entry<ByteBuffer, Column> entry : row.entrySet())
                 {
-                    bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+                    bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                 }
             }
         }
@@ -211,8 +210,7 @@ public class CassandraStorage extends AbstractCassandraStorage
             if (!reader.nextKeyValue())
                 return null;
 
-            CfInfo cfInfo = getCfInfo(loadSignature);
-            CfDef cfDef = cfInfo.cfDef;
+            CfDef cfDef = getCfDef(loadSignature);
             ByteBuffer key = reader.getCurrentKey();
             Map<ByteBuffer, Column> cf = reader.getCurrentValue();
             assert key != null && cf != null;
@@ -228,21 +226,11 @@ public class CassandraStorage extends AbstractCassandraStorage
             // take care to iterate these in the same order as the schema does
             for (ColumnDef cdef : cfDef.column_metadata)
             {
-                boolean hasColumn = false;
-                boolean cql3Table = false;
-                try
+                if (cf.containsKey(cdef.name))
                 {
-                    hasColumn = cf.containsKey(cdef.name);
+                    tuple.append(columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type())));
                 }
-                catch (Exception e)
-                {
-                    cql3Table = true;                  
-                }
-                if (hasColumn)
-                {
-                    tuple.append(columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type())));
-                }
-                else if (!cql3Table)
+                else
                 {   // otherwise, we need to add an empty tuple to take its place
                     tuple.append(TupleFactory.getInstance().newTuple());
                 }
@@ -252,7 +240,7 @@ public class CassandraStorage extends AbstractCassandraStorage
             for (Map.Entry<ByteBuffer, Column> entry : cf.entrySet())
             {
                 if (!added.containsKey(entry.getKey()))
-                    bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+                    bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
             }
             tuple.append(bag);
             // finally, special top-level indexes if needed
@@ -260,7 +248,7 @@ public class CassandraStorage extends AbstractCassandraStorage
             {
                 for (ColumnDef cdef : getIndexes())
                 {
-                    Tuple throwaway = columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type()));
+                    Tuple throwaway = columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type()));
                     tuple.append(throwaway.get(1));
                 }
             }
@@ -370,8 +358,7 @@ public class CassandraStorage extends AbstractCassandraStorage
     public ResourceSchema getSchema(String location, Job job) throws IOException
     {
         setLocation(location, job);
-        CfInfo cfInfo = getCfInfo(loadSignature);
-        CfDef cfDef = cfInfo.cfDef;
+        CfDef cfDef = getCfDef(loadSignature);
 
         if (cfDef.column_type.equals("Super"))
             return null;
@@ -418,7 +405,7 @@ public class CassandraStorage extends AbstractCassandraStorage
         // add the key first, then the indexed columns, and finally the bag
         allSchemaFields.add(keyFieldSchema);
 
-        if (!widerows && (cfInfo.compactCqlTable || !cfInfo.cql3Table))
+        if (!widerows)
         {
             // defined validators/indexes
             for (ColumnDef cdef : cfDef.column_metadata)
@@ -712,17 +699,12 @@ public class CassandraStorage extends AbstractCassandraStorage
     }
 
     /** get a list of column for the column family */
-    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client) 
-            throws InvalidRequestException, 
-            UnavailableException, 
-            TimedOutException, 
-            SchemaDisagreementException, 
-            TException,
-            CharacterCodingException,
-            org.apache.cassandra.exceptions.InvalidRequestException,
-            ConfigurationException,
-            NotFoundException
+    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table) 
+    throws TException, CharacterCodingException, InvalidRequestException, ConfigurationException
     {
+        if (cql3Table)
+            return new ArrayList<>();
+        
         return getColumnMeta(client, true, true);
     }
 
@@ -807,5 +789,6 @@ public class CassandraStorage extends AbstractCassandraStorage
             		                        "[&comparator=<comparator>][&split_size=<size>][&partitioner=<partitioner>]]': " + e.getMessage());
         }
     }
+
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5dba3c6/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 9a9b522..50ee6b7 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -94,8 +94,7 @@ public class CqlStorage extends AbstractCassandraStorage
             if (!reader.nextKeyValue())
                 return null;
 
-            CfInfo cfInfo = getCfInfo(loadSignature);
-            CfDef cfDef = cfInfo.cfDef;
+            CfDef cfDef = getCfDef(loadSignature);
             Map<String, ByteBuffer> keys = reader.getCurrentKey();
             Map<String, ByteBuffer> columns = reader.getCurrentValue();
             assert keys != null && columns != null;
@@ -281,8 +280,7 @@ public class CqlStorage extends AbstractCassandraStorage
     public ResourceSchema getSchema(String location, Job job) throws IOException
     {
         setLocation(location, job);
-        CfInfo cfInfo = getCfInfo(loadSignature);
-        CfDef cfDef = cfInfo.cfDef;
+        CfDef cfDef = getCfDef(loadSignature);
 
         // top-level schema, no type
         ResourceSchema schema = new ResourceSchema();
@@ -432,7 +430,7 @@ public class CqlStorage extends AbstractCassandraStorage
     }
     
     /** include key columns */
-    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
+    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
             throws InvalidRequestException,
             UnavailableException,
             TimedOutException,