You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/05/26 13:17:53 UTC
[4/5] git commit: Fix 5488 round 2
Fix 5488 round 2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2dd73d17
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2dd73d17
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2dd73d17
Branch: refs/heads/cassandra-1.2
Commit: 2dd73d171068d743befcd445a14751032d232e4e
Parents: 0db9406
Author: Brandon Williams <br...@apache.org>
Authored: Wed May 22 11:18:59 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed May 22 11:19:05 2013 -0500
----------------------------------------------------------------------
.../cassandra/hadoop/pig/CassandraStorage.java | 34 ++++++++++-----
1 files changed, 23 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2dd73d17/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index b681ee3..cf1c08f 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -130,7 +130,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
{
CfDef cfDef = getCfDef(loadSignature);
ByteBuffer key = null;
- Tuple tuple = TupleFactory.getInstance().newTuple();
+ Tuple tuple = null;
DefaultDataBag bag = new DefaultDataBag();
try
{
@@ -139,12 +139,15 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
hasNext = reader.nextKeyValue();
if (!hasNext)
{
+ if (tuple == null)
+ tuple = TupleFactory.getInstance().newTuple();
+
if (lastRow != null)
{
if (tuple.size() == 0) // lastRow is a new one
{
key = (ByteBuffer)reader.getCurrentKey();
- tuple = addKeyToTuple(tuple, key, cfDef, parseType(cfDef.getKey_validation_class()));
+ tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
}
for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
{
@@ -180,7 +183,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
key = (ByteBuffer)reader.getCurrentKey();
if (lastKey != null && !(key.equals(lastKey))) // last key only had one value
{
- tuple = addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
+ if (tuple == null)
+ tuple = keyToTuple(lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
+ else
+ addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
@@ -190,7 +196,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
return tuple;
}
- tuple = addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
+ if (tuple == null)
+ tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
+ else
+ addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
}
SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
if (lastRow != null) // prepend what was read last time
@@ -233,7 +242,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
// output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest
// NOTE: we're setting the tuple size here only for the key so we can use setTupleValue on it
- Tuple tuple = addKeyToTuple(null, key, cfDef, parseType(cfDef.getKey_validation_class()));
+ Tuple tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
DefaultDataBag bag = new DefaultDataBag();
// we must add all the indexed columns first to match the schema
@@ -292,12 +301,15 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
return t;
}
- private Tuple addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
+ private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
+ {
+ Tuple tuple = TupleFactory.getInstance().newTuple(1);
+ addKeyToTuple(tuple, key, cfDef, comparator);
+ return tuple;
+ }
+
+ private void addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
{
- if( tuple == null )
- {
- tuple = TupleFactory.getInstance().newTuple(1);
- }
if( comparator instanceof AbstractCompositeType )
{
setTupleValue(tuple, 0, composeComposite((AbstractCompositeType)comparator,key));
@@ -306,7 +318,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
{
setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR).compose(key));
}
- return tuple;
+
}
private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException