You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/04/20 18:24:09 UTC

git commit: Pig wide row support. Patch by brandonwilliams, reviewed by xedin for CASSANDRA-3909

Updated Branches:
  refs/heads/cassandra-1.1.0 5ccdc7f15 -> 015dc3fca


Pig wide row support.
Patch by brandonwilliams, reviewed by xedin for CASSANDRA-3909


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/015dc3fc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/015dc3fc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/015dc3fc

Branch: refs/heads/cassandra-1.1.0
Commit: 015dc3fca9c3117fb0268d6b8d201d7000568aab
Parents: 5ccdc7f
Author: Brandon Williams <br...@apache.org>
Authored: Fri Apr 20 11:19:48 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Apr 20 11:19:48 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/hadoop/pig/CassandraStorage.java     |   87 ++++++++++++++-
 2 files changed, 87 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/015dc3fc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b19d118..e30004a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@
  * (cql) Fix type in CQL3 ALTER TABLE preventing update (CASSANDRA-4170)
  * (cql) Throw invalid exception from CQL3 on obsolete options (CASSANDRA-4171)
  * (cqlsh) fix recognizing uppercase SELECT keyword (CASSANDRA-4161)
+ * Pig: wide row support (CASSANDRA-3909)
 Merged from 1.0:
  * avoid streaming empty files with bulk loader if sstablewriter errors out
    (CASSANDRA-3946)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/015dc3fc/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index f10dde5..0609d11 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -78,9 +78,11 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT";
     public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
     public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES";
+    public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT";
 
     private final static String DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat";
     private final static String DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat";
+    private final static boolean DEFAULT_WIDEROW_INPUT = false;
 
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
     private static final Log logger = LogFactory.getLog(CassandraStorage.class);
@@ -100,6 +102,11 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     private String inputFormatClass;
     private String outputFormatClass;
     private int limit;
+    private boolean widerows;
+    // wide row hacks
+    private Map<ByteBuffer,IColumn> lastRow;
+    private boolean hasNext = true;
+
 
     public CassandraStorage()
     {
@@ -119,10 +126,85 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     {
         return limit;
     }
+    
+    public Tuple getNextWide() throws IOException
+    {
+        CfDef cfDef = getCfDef(loadSignature);
+        ByteBuffer key = null;
+        Tuple tuple = TupleFactory.getInstance().newTuple();
+        DefaultDataBag bag = new DefaultDataBag();
+        try
+        {
+            while(true)
+            {
+                hasNext = reader.nextKeyValue();
+                if (!hasNext)
+                {
+                    if (lastRow != null)
+                    {
+                        if (tuple.size() == 0) // lastRow is a new one
+                        {
+                            key = (ByteBuffer)reader.getCurrentKey();
+                            tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
+                        }
+                        for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
+                        {
+                            bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                        }
+                        lastRow = null;
+                        tuple.append(bag);
+                        return tuple;
+                    }
+                    else
+                    {
+                        if (tuple.size() == 1) // rare case of just one wide row, key already set
+                        {
+                            tuple.append(bag);
+                            return tuple;
+                        }
+                        else
+                            return null;
+                    }
+                }
+                if (key != null && !((ByteBuffer)reader.getCurrentKey()).equals(key)) // key changed
+                {
+                    // read too much, hold on to it for next time
+                    lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+                    // but return what we have so far
+                    tuple.append(bag);
+                    return tuple;
+                }
+                if (key == null) // only set the key on the first iteration
+                {
+                    key = (ByteBuffer)reader.getCurrentKey();
+                    tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
+                }
+                SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+                if (lastRow != null) // prepend what was read last time
+                {
+                    for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
+                    {
+                        bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                    }
+                    lastRow = null;
+                }
+                for (Map.Entry<ByteBuffer, IColumn> entry : row.entrySet())
+                {
+                    bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                }
+            }
+        }
+        catch (InterruptedException e)
+        {
+            throw new IOException(e.getMessage());
+        }
+    }
 
     @Override
     public Tuple getNext() throws IOException
     {
+        if (widerows)
+            return getNextWide();
         try
         {
             // load the next pair
@@ -423,7 +505,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
             SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
             ConfigHelper.setInputSlicePredicate(conf, predicate);
         }
-        ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
+        widerows = DEFAULT_WIDEROW_INPUT;
+        if (System.getenv(PIG_WIDEROW_INPUT) != null)
+            widerows = Boolean.valueOf(System.getProperty(PIG_WIDEROW_INPUT));
+        ConfigHelper.setInputColumnFamily(conf, keyspace, column_family, widerows);
         setConnectionInformation();
 
         if (ConfigHelper.getInputRpcPort(conf) == 0)