You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2013/05/22 18:20:11 UTC

git commit: Fix 5488 round 2

Updated Branches:
  refs/heads/cassandra-1.1 0db940695 -> 2dd73d171


Fix 5488 round 2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2dd73d17
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2dd73d17
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2dd73d17

Branch: refs/heads/cassandra-1.1
Commit: 2dd73d171068d743befcd445a14751032d232e4e
Parents: 0db9406
Author: Brandon Williams <br...@apache.org>
Authored: Wed May 22 11:18:59 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed May 22 11:19:05 2013 -0500

----------------------------------------------------------------------
 .../cassandra/hadoop/pig/CassandraStorage.java     |   34 ++++++++++-----
 1 files changed, 23 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2dd73d17/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index b681ee3..cf1c08f 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -130,7 +130,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     {
         CfDef cfDef = getCfDef(loadSignature);
         ByteBuffer key = null;
-        Tuple tuple = TupleFactory.getInstance().newTuple();
+        Tuple tuple = null; 
         DefaultDataBag bag = new DefaultDataBag();
         try
         {
@@ -139,12 +139,15 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
                 hasNext = reader.nextKeyValue();
                 if (!hasNext)
                 {
+                    if (tuple == null)
+                        tuple = TupleFactory.getInstance().newTuple();
+
                     if (lastRow != null)
                     {
                         if (tuple.size() == 0) // lastRow is a new one
                         {
                             key = (ByteBuffer)reader.getCurrentKey();
-                            tuple = addKeyToTuple(tuple, key, cfDef, parseType(cfDef.getKey_validation_class()));
+                            tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
                         }
                         for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
                         {
@@ -180,7 +183,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
                     key = (ByteBuffer)reader.getCurrentKey();
                     if (lastKey != null && !(key.equals(lastKey))) // last key only had one value
                     {
-                        tuple = addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
+                        if (tuple == null)
+                            tuple = keyToTuple(lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
+                        else
+                            addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
                         for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
                         {
                             bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
@@ -190,7 +196,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
                         lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
                         return tuple;
                     }
-                    tuple = addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
+                    if (tuple == null)
+                        tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
+                    else
+                        addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
                 }
                 SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
                 if (lastRow != null) // prepend what was read last time
@@ -233,7 +242,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
             // output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest
             // NOTE: we're setting the tuple size here only for the key so we can use setTupleValue on it
 
-            Tuple tuple = addKeyToTuple(null, key, cfDef, parseType(cfDef.getKey_validation_class()));
+            Tuple tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
             DefaultDataBag bag = new DefaultDataBag();
 
             // we must add all the indexed columns first to match the schema
@@ -292,12 +301,15 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         return t;
     }
 
-    private Tuple addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
+    private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
+    {
+        Tuple tuple = TupleFactory.getInstance().newTuple(1);
+        addKeyToTuple(tuple, key, cfDef, comparator);
+        return tuple;
+    }
+
+    private void addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
     {
-        if( tuple == null )
-        {
-            tuple = TupleFactory.getInstance().newTuple(1);
-        }
         if( comparator instanceof AbstractCompositeType )
         {
             setTupleValue(tuple, 0, composeComposite((AbstractCompositeType)comparator,key));
@@ -306,7 +318,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         {
             setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR).compose(key));
         }
-        return tuple;
+
     }
 
     private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException