You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/10/11 20:58:31 UTC
[2/3] git commit: Pig: fix widerow input with single column rows
Patch by Will Oberman, reviewed by brandonwilliams for CASSANDRA-4789
Pig: fix widerow input with single column rows
Patch by Will Oberman, reviewed by brandonwilliams for CASSANDRA-4789
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4355aa87
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4355aa87
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4355aa87
Branch: refs/heads/trunk
Commit: 4355aa877f29b653614cb801a2b57861f4cef428
Parents: 178c934
Author: Brandon Williams <br...@apache.org>
Authored: Thu Oct 11 13:54:32 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Thu Oct 11 13:58:16 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/pig/CassandraStorage.java | 17 ++++++++++++++-
2 files changed, 17 insertions(+), 1 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4355aa87/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ac3a157..40cd7b4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
1.1.6
* fix commitlog replay for nanotime-infected sstables (CASSANDRA-4782)
* preflight check ttl for maximum of 20 years (CASSANDRA-4771)
+ * (Pig) fix widerow input with single column rows (CASSANDRA-4789)
* Fix HH to compact with correct gcBefore, which avoids wiping out
undelivered hints (CASSANDRA-4772)
* LCS will merge up to 32 L0 sstables as intended (CASSANDRA-4778)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4355aa87/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 434ca7f..49d8eac 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -109,10 +109,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
private boolean widerows;
private boolean usePartitionFilter;
// wide row hacks
+ private ByteBuffer lastKey;
private Map<ByteBuffer,IColumn> lastRow;
private boolean hasNext = true;
-
public CassandraStorage()
{
this(1024);
@@ -156,6 +156,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
{
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
+ lastKey = null;
lastRow = null;
tuple.append(bag);
return tuple;
@@ -174,6 +175,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
if (key != null && !((ByteBuffer)reader.getCurrentKey()).equals(key)) // key changed
{
// read too much, hold on to it for next time
+ lastKey = (ByteBuffer)reader.getCurrentKey();
lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
// but return what we have so far
tuple.append(bag);
@@ -182,6 +184,18 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
if (key == null) // only set the key on the first iteration
{
key = (ByteBuffer)reader.getCurrentKey();
+ if (lastKey != null && !(key.equals(lastKey))) // last key only had one value
+ {
+ tuple.append(new DataByteArray(lastKey.array(), lastKey.position()+lastKey.arrayOffset(), lastKey.limit()+lastKey.arrayOffset()));
+ for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
+ {
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+ }
+ tuple.append(bag);
+ lastKey = key;
+ lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+ return tuple;
+ }
tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
}
SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
@@ -191,6 +205,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
{
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
+ lastKey = null;
lastRow = null;
}
for (Map.Entry<ByteBuffer, IColumn> entry : row.entrySet())