You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2012/01/31 22:41:31 UTC

git commit: (Pig) fix CassandraStorage to use correct comparator in Super ColumnFamily case patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-3251

Updated Branches:
  refs/heads/cassandra-0.8 9ecaa2a5c -> dbd8ced10


(Pig) fix CassandraStorage to use correct comparator in Super ColumnFamily case
patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-3251


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dbd8ced1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dbd8ced1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dbd8ced1

Branch: refs/heads/cassandra-0.8
Commit: dbd8ced107fad720c416d7e4919cb33884378a02
Parents: 9ecaa2a
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Tue Jan 31 23:24:27 2012 +0200
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Wed Feb 1 00:39:37 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    3 +-
 .../cassandra/hadoop/pig/CassandraStorage.java     |   45 ++++++++++-----
 2 files changed, 33 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd8ced1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 64b2747..a6a724f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,7 +3,8 @@
  * use correct list of replicas for LOCAL_QUORUM reads when read repair
    is disabled (CASSANDRA-3696)
  * block on flush before compacting hints (may prevent OOM) (CASSANDRA-3733)
-
+ * (Pig) fix CassandraStorage to use correct comparator in Super ColumnFamily
+   case (CASSANDRA-3251)
 
 0.8.9
  * avoid logging (harmless) exception when GC takes < 1ms (CASSANDRA-3656)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd8ced1/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index fd0959d..a0dec20 100644
--- a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -128,7 +128,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
             tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
             for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
             {
-                columns.add(columnToTuple(entry.getKey(), entry.getValue(), cfDef));
+                columns.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
             }
 
             tuple.set(1, new DefaultDataBag(columns));
@@ -140,29 +140,31 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         }
     }
 
-    private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException
+    private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException
     {
         Tuple pair = TupleFactory.getInstance().newTuple(2);
         List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
 
-        setTupleValue(pair, 0, marshallers.get(0).compose(name));
+        setTupleValue(pair, 0, comparator.compose(col.name()));
         if (col instanceof Column)
         {
             // standard
-            if (validators.get(name) == null)
+            if (validators.get(col.name()) == null)
                 setTupleValue(pair, 1, marshallers.get(1).compose(col.value()));
             else
-                setTupleValue(pair, 1, validators.get(name).compose(col.value()));
+                setTupleValue(pair, 1, validators.get(col.name()).compose(col.value()));
             return pair;
         }
+        else
+        {
+            // super
+            ArrayList<Tuple> subcols = new ArrayList<Tuple>();
+            for (IColumn subcol : col.getSubColumns())
+                subcols.add(columnToTuple(subcol, cfDef, parseType(cfDef.getSubcomparator_type())));
 
-        // super
-        ArrayList<Tuple> subcols = new ArrayList<Tuple>();
-        for (IColumn subcol : col.getSubColumns())
-            subcols.add(columnToTuple(subcol.name(), subcol, cfDef));
-        
-        pair.set(1, new DefaultDataBag(subcols));
+            pair.set(1, new DefaultDataBag(subcols));
+        }
         return pair;
     }
 
@@ -188,12 +190,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
     {
         ArrayList<AbstractType> marshallers = new ArrayList<AbstractType>();
-        AbstractType comparator = null;
-        AbstractType default_validator = null;
-        AbstractType key_validator = null;
+        AbstractType comparator;
+        AbstractType subcomparator;
+        AbstractType default_validator;
+        AbstractType key_validator;
         try
         {
             comparator = TypeParser.parse(cfDef.getComparator_type());
+            subcomparator = TypeParser.parse(cfDef.getSubcomparator_type());
             default_validator = TypeParser.parse(cfDef.getDefault_validation_class());
             key_validator = TypeParser.parse(cfDef.getKey_validation_class());
         }
@@ -205,6 +209,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         marshallers.add(comparator);
         marshallers.add(default_validator);
         marshallers.add(key_validator);
+        marshallers.add(subcomparator);
         return marshallers;
     }
 
@@ -230,6 +235,18 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         return validators;
     }
 
+    private AbstractType parseType(String type) throws IOException
+    {
+        try
+        {
+            return TypeParser.parse(type);
+        }
+        catch (ConfigurationException e)
+        {
+            throw new IOException(e);
+        }
+    }
+
     @Override
     public InputFormat getInputFormat()
     {