You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/04/20 18:24:09 UTC
git commit: Pig wide row support. Patch by brandonwilliams,
reviewed by xedin for CASSANDRA-3909
Updated Branches:
refs/heads/cassandra-1.1.0 5ccdc7f15 -> 015dc3fca
Pig wide row support.
Patch by brandonwilliams, reviewed by xedin for CASSANDRA-3909
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/015dc3fc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/015dc3fc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/015dc3fc
Branch: refs/heads/cassandra-1.1.0
Commit: 015dc3fca9c3117fb0268d6b8d201d7000568aab
Parents: 5ccdc7f
Author: Brandon Williams <br...@apache.org>
Authored: Fri Apr 20 11:19:48 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Apr 20 11:19:48 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/pig/CassandraStorage.java | 87 ++++++++++++++-
2 files changed, 87 insertions(+), 1 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/015dc3fc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b19d118..e30004a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@
* (cql) Fix type in CQL3 ALTER TABLE preventing update (CASSANDRA-4170)
* (cql) Throw invalid exception from CQL3 on obsolete options (CASSANDRA-4171)
* (cqlsh) fix recognizing uppercase SELECT keyword (CASSANDRA-4161)
+ * Pig: wide row support (CASSANDRA-3909)
Merged from 1.0:
* avoid streaming empty files with bulk loader if sstablewriter errors out
(CASSANDRA-3946)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/015dc3fc/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index f10dde5..0609d11 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -78,9 +78,11 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT";
public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES";
+ public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT";
private final static String DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat";
private final static String DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat";
+ private final static boolean DEFAULT_WIDEROW_INPUT = false;
private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
private static final Log logger = LogFactory.getLog(CassandraStorage.class);
@@ -100,6 +102,11 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
private String inputFormatClass;
private String outputFormatClass;
private int limit;
+ private boolean widerows;
+ // wide row hacks
+ private Map<ByteBuffer,IColumn> lastRow;
+ private boolean hasNext = true;
+
public CassandraStorage()
{
@@ -119,10 +126,85 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
{
return limit;
}
+
+ public Tuple getNextWide() throws IOException
+ {
+ CfDef cfDef = getCfDef(loadSignature);
+ ByteBuffer key = null;
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+ DefaultDataBag bag = new DefaultDataBag();
+ try
+ {
+ while(true)
+ {
+ hasNext = reader.nextKeyValue();
+ if (!hasNext)
+ {
+ if (lastRow != null)
+ {
+ if (tuple.size() == 0) // lastRow is a new one
+ {
+ key = (ByteBuffer)reader.getCurrentKey();
+ tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
+ }
+ for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
+ {
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+ }
+ lastRow = null;
+ tuple.append(bag);
+ return tuple;
+ }
+ else
+ {
+ if (tuple.size() == 1) // rare case of just one wide row, key already set
+ {
+ tuple.append(bag);
+ return tuple;
+ }
+ else
+ return null;
+ }
+ }
+ if (key != null && !((ByteBuffer)reader.getCurrentKey()).equals(key)) // key changed
+ {
+ // read too much, hold on to it for next time
+ lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+ // but return what we have so far
+ tuple.append(bag);
+ return tuple;
+ }
+ if (key == null) // only set the key on the first iteration
+ {
+ key = (ByteBuffer)reader.getCurrentKey();
+ tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
+ }
+ SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+ if (lastRow != null) // prepend what was read last time
+ {
+ for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
+ {
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+ }
+ lastRow = null;
+ }
+ for (Map.Entry<ByteBuffer, IColumn> entry : row.entrySet())
+ {
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+ }
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e.getMessage());
+ }
+ }
@Override
public Tuple getNext() throws IOException
{
+ if (widerows)
+ return getNextWide();
try
{
// load the next pair
@@ -423,7 +505,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
ConfigHelper.setInputSlicePredicate(conf, predicate);
}
- ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
+ widerows = DEFAULT_WIDEROW_INPUT;
+ if (System.getenv(PIG_WIDEROW_INPUT) != null)
+ widerows = Boolean.valueOf(System.getProperty(PIG_WIDEROW_INPUT));
+ ConfigHelper.setInputColumnFamily(conf, keyspace, column_family, widerows);
setConnectionInformation();
if (ConfigHelper.getInputRpcPort(conf) == 0)