You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2013/05/21 18:14:47 UTC
[3/6] git commit: Fix NPE in Pig's widerow mode. Patch by Sheetal
Gorsani and Jeremy Hanna, reviewed by brandonwilliams for CASSANDRA-5488
Fix NPE in Pig's widerow mode.
Patch by Sheetal Gorsani and Jeremy Hanna, reviewed by brandonwilliams
for CASSANDRA-5488
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7d2ce5f9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7d2ce5f9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7d2ce5f9
Branch: refs/heads/trunk
Commit: 7d2ce5f957b1fb392617c1ff05a561571eccd593
Parents: c5dc029
Author: Brandon Williams <br...@apache.org>
Authored: Tue May 21 11:08:50 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue May 21 11:08:50 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
examples/pig/test/test_storage.pig | 2 +-
.../cassandra/hadoop/pig/CassandraStorage.java | 23 ++++++--------
3 files changed, 12 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d2ce5f9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7c89987..256e69a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@
(CASSANDRA-5497)
* fsync leveled manifest to avoid corruption (CASSANDRA-5535)
* Fix Bound intersection computation (CASSANDRA-5551)
+ * Fix NPE in Pig's widerow mode (CASSANDRA-5488)
1.1.11
* Fix trying to load deleted row into row cache on startup (CASSANDRA-4463)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d2ce5f9/examples/pig/test/test_storage.pig
----------------------------------------------------------------------
diff --git a/examples/pig/test/test_storage.pig b/examples/pig/test/test_storage.pig
index 026cb02..93dd91f 100644
--- a/examples/pig/test/test_storage.pig
+++ b/examples/pig/test/test_storage.pig
@@ -1,4 +1,4 @@
-rows = LOAD 'cassandra://PigTest/SomeApp' USING CassandraStorage();
+rows = LOAD 'cassandra://PigTest/SomeApp?widerows=true' USING CassandraStorage();
-- full copy
STORE rows INTO 'cassandra://PigTest/CopyOfSomeApp' USING CassandraStorage();
-- single tuple
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d2ce5f9/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 55ccbb9..b681ee3 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -144,7 +144,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
if (tuple.size() == 0) // lastRow is a new one
{
key = (ByteBuffer)reader.getCurrentKey();
- addKeyToTuple(tuple, key, cfDef, parseType(cfDef.getKey_validation_class()));
+ tuple = addKeyToTuple(tuple, key, cfDef, parseType(cfDef.getKey_validation_class()));
}
for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
{
@@ -180,7 +180,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
key = (ByteBuffer)reader.getCurrentKey();
if (lastKey != null && !(key.equals(lastKey))) // last key only had one value
{
- addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
+ tuple = addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
@@ -190,7 +190,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
return tuple;
}
- addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
+ tuple = addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
}
SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
if (lastRow != null) // prepend what was read last time
@@ -233,7 +233,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
// output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest
// NOTE: we're setting the tuple size here only for the key so we can use setTupleValue on it
- Tuple tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
+ Tuple tuple = addKeyToTuple(null, key, cfDef, parseType(cfDef.getKey_validation_class()));
DefaultDataBag bag = new DefaultDataBag();
// we must add all the indexed columns first to match the schema
@@ -292,15 +292,12 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
return t;
}
- private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
- {
- Tuple tuple = TupleFactory.getInstance().newTuple(1);
- addKeyToTuple(tuple, key, cfDef, comparator);
- return tuple;
- }
-
- private void addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
+ private Tuple addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
{
+ if( tuple == null )
+ {
+ tuple = TupleFactory.getInstance().newTuple(1);
+ }
if( comparator instanceof AbstractCompositeType )
{
setTupleValue(tuple, 0, composeComposite((AbstractCompositeType)comparator,key));
@@ -309,7 +306,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
{
setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR).compose(key));
}
-
+ return tuple;
}
private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException