You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/10/11 20:58:31 UTC

[2/3] git commit: Pig: fix widerow input with single column rows Patch by Will Oberman, reviewed by brandonwilliams for CASSANDRA-4789

Pig: fix widerow input with single column rows
Patch by Will Oberman, reviewed by brandonwilliams for CASSANDRA-4789


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4355aa87
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4355aa87
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4355aa87

Branch: refs/heads/trunk
Commit: 4355aa877f29b653614cb801a2b57861f4cef428
Parents: 178c934
Author: Brandon Williams <br...@apache.org>
Authored: Thu Oct 11 13:54:32 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Thu Oct 11 13:58:16 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/hadoop/pig/CassandraStorage.java     |   17 ++++++++++++++-
 2 files changed, 17 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4355aa87/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ac3a157..40cd7b4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
 1.1.6
  * fix commitlog replay for nanotime-infected sstables (CASSANDRA-4782)
  * preflight check ttl for maximum of 20 years (CASSANDRA-4771)
+ * (Pig) fix widerow input with single column rows (CASSANDRA-4789)
  * Fix HH to compact with correct gcBefore, which avoids wiping out
    undelivered hints (CASSANDRA-4772)
  * LCS will merge up to 32 L0 sstables as intended (CASSANDRA-4778)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4355aa87/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 434ca7f..49d8eac 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -109,10 +109,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     private boolean widerows;
     private boolean usePartitionFilter;
     // wide row hacks
+    private ByteBuffer lastKey;
     private Map<ByteBuffer,IColumn> lastRow;
     private boolean hasNext = true;
 
-
     public CassandraStorage()
     {
         this(1024);
@@ -156,6 +156,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
                         {
                             bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                         }
+                        lastKey = null;
                         lastRow = null;
                         tuple.append(bag);
                         return tuple;
@@ -174,6 +175,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
                 if (key != null && !((ByteBuffer)reader.getCurrentKey()).equals(key)) // key changed
                 {
                     // read too much, hold on to it for next time
+                    lastKey = (ByteBuffer)reader.getCurrentKey();
                     lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
                     // but return what we have so far
                     tuple.append(bag);
@@ -182,6 +184,18 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
                 if (key == null) // only set the key on the first iteration
                 {
                     key = (ByteBuffer)reader.getCurrentKey();
+                    if (lastKey != null && !(key.equals(lastKey))) // last key only had one value
+                    {
+                        tuple.append(new DataByteArray(lastKey.array(), lastKey.position()+lastKey.arrayOffset(), lastKey.limit()+lastKey.arrayOffset()));
+                        for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
+                        {
+                            bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                        }
+                        tuple.append(bag);
+                        lastKey = key;
+                        lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+                        return tuple;
+                    }
                     tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
                 }
                 SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
@@ -191,6 +205,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
                     {
                         bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                     }
+                    lastKey = null;
                     lastRow = null;
                 }
                 for (Map.Entry<ByteBuffer, IColumn> entry : row.entrySet())