You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/04/11 20:26:43 UTC

[6/21] git commit: update get_paged_slice to allow starting with a key; fixes for WideRowIterator patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3883

update get_paged_slice to allow starting with a key; fixes for WideRowIterator
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3883


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/97aa922a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/97aa922a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/97aa922a

Branch: refs/heads/cassandra-1.1.0
Commit: 97aa922a7476dce06121ae289877abccf161afae
Parents: dbc0f59
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Apr 10 16:06:39 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Apr 11 13:24:56 2012 -0500

----------------------------------------------------------------------
 .../cassandra/hadoop/ColumnFamilyInputFormat.java  |   25 ++-
 .../cassandra/hadoop/ColumnFamilyRecordReader.java |  136 ++++++++------
 .../apache/cassandra/thrift/CassandraServer.java   |    5 +-
 .../apache/cassandra/thrift/ThriftValidation.java  |   19 +--
 4 files changed, 108 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/97aa922a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index ef56678..354903d 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -34,6 +34,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import com.google.common.collect.ImmutableList;
+
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
@@ -87,6 +89,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
 
     private String keyspace;
     private String cfName;
+    private IPartitioner partitioner;
 
     private static void validateConfiguration(Configuration conf)
     {
@@ -100,8 +103,8 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
         }
         if (ConfigHelper.getInputInitialAddress(conf) == null)
             throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node");
-
-        // input partitioner is optional -- used only if requesting an ordered key scan
+        if (ConfigHelper.getInputPartitioner(conf) == null)
+            throw new UnsupportedOperationException("You must set the Cassandra partitioner class");
     }
 
     public List<InputSplit> getSplits(JobContext context) throws IOException
@@ -115,6 +118,8 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
 
         keyspace = ConfigHelper.getInputKeyspace(context.getConfiguration());
         cfName = ConfigHelper.getInputColumnFamily(context.getConfiguration());
+        partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration());
+        logger.debug("partitioner is " + partitioner);
 
         // cannonical ranges, split into pieces, fetching the splits in parallel
         ExecutorService executor = Executors.newCachedThreadPool();
@@ -124,11 +129,9 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
         {
             List<Future<List<InputSplit>>> splitfutures = new ArrayList<Future<List<InputSplit>>>();
             KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
-            IPartitioner partitioner = null;
             Range<Token> jobRange = null;
             if (jobKeyRange != null && jobKeyRange.start_token != null)
             {
-                partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration());
                 assert partitioner.preservesOrder() : "ConfigHelper.setInputKeyRange(..) can only be used with a order preserving paritioner";
                 assert jobKeyRange.start_key == null : "only start_token supported";
                 assert jobKeyRange.end_key == null : "only end_token supported";
@@ -219,11 +222,19 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
 		        endpoints[endpointIndex++] = InetAddress.getByName(endpoint_address).getHostName();
             }
 
+            Token.TokenFactory factory = partitioner.getTokenFactory();
             for (int i = 1; i < tokens.size(); i++)
             {
-                ColumnFamilySplit split = new ColumnFamilySplit(tokens.get(i - 1), tokens.get(i), endpoints);
-                logger.debug("adding " + split);
-                splits.add(split);
+                Token left = factory.fromString(tokens.get(i - 1));
+                Token right = factory.fromString(tokens.get(i));
+                Range<Token> range = new Range<Token>(left, right, partitioner);
+                List<Range<Token>> ranges = range.isWrapAround() ? range.unwrap() : ImmutableList.of(range);
+                for (Range<Token> subrange : ranges)
+                {
+                    ColumnFamilySplit split = new ColumnFamilySplit(factory.toString(subrange.left), factory.toString(subrange.right), endpoints);
+                    logger.debug("adding " + split);
+                    splits.add(split);
+                }
             }
             return splits;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/97aa922a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index 483c040..600cf13 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -29,10 +29,9 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Iterables;
-import org.apache.commons.lang.ArrayUtils;
+import com.google.common.collect.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.config.ConfigurationException;
@@ -55,6 +54,8 @@ import org.apache.thrift.transport.TSocket;
 public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
     implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
 {
+    private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class);
+
     public static final int CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192;
 
     private ColumnFamilySplit split;
@@ -179,6 +180,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
         }
 
         iter = widerows ? new WideRowIterator() : new StaticRowIterator();
+        logger.debug("created {}", iter);
     }
 
     public boolean nextKeyValue() throws IOException
@@ -230,9 +232,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
     private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>>
     {
         protected List<KeySlice> rows;
-        protected KeySlice lastRow;
         protected int totalRead = 0;
-        protected int i = 0;
         protected final AbstractType<?> comparator;
         protected final AbstractType<?> subComparator;
         protected final IPartitioner partitioner;
@@ -299,7 +299,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
             return sc;
         }
 
-        private IColumn unthriftifySimple(Column column)
+        protected IColumn unthriftifySimple(Column column)
         {
             return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp);
         }
@@ -322,23 +322,23 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
 
     private class StaticRowIterator extends RowIterator
     {
+        protected int i = 0;
+
         private void maybeInit()
         {
             // check if we need another batch
-            if (rows != null && i >= rows.size())
-                rows = null;
-
-            if (rows != null)
+            if (rows != null && i < rows.size())
                 return;
 
             String startToken;
-            if (lastRow == null)
+            if (totalRead == 0)
             {
+                // first request
                 startToken = split.getStartToken();
             }
             else
             {
-                startToken = partitioner.getTokenFactory().toString(partitioner.getToken(lastRow.key));
+                startToken = partitioner.getTokenFactory().toString(partitioner.getToken(Iterables.getLast(rows).key));
                 if (startToken.equals(split.getEndToken()))
                 {
                     // reached end of the split
@@ -362,9 +362,6 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
                     return;
                 }
 
-                // prepare for the next slice to be read
-                lastRow = Iterables.getLast(rows);
-
                 // remove ghosts when fetching all columns
                 if (isEmptyPredicate)
                 {
@@ -415,64 +412,49 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
 
     private class WideRowIterator extends RowIterator
     {
-        private Iterator<ColumnOrSuperColumn> wideColumns;
+        private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>> wideColumns;
 
         private void maybeInit()
         {
             if (wideColumns != null && wideColumns.hasNext())
                 return;
 
-            // check if we need another batch
-            if (rows != null && ++i >= rows.size())
-                rows = null;
-
-            if (rows != null)
-            {
-                wideColumns = rows.get(i).columns.iterator();
-                return;
-            }
-
-            String startToken;
+            KeyRange keyRange;
             ByteBuffer startColumn;
-            if (lastRow == null)
+            if (totalRead == 0)
             {
-                startToken = split.getStartToken();
+                String startToken = split.getStartToken();
+                keyRange = new KeyRange(batchSize)
+                          .setStart_token(startToken)
+                          .setEnd_token(split.getEndToken())
+                          .setRow_filter(filter);
                 startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
             }
             else
             {
-                startToken = partitioner.getTokenFactory().toString(partitioner.getToken(lastRow.key));
+                KeySlice lastRow = Iterables.getLast(rows);
+                logger.debug("Starting with last-seen row {}", lastRow.key);
+                keyRange = new KeyRange(batchSize)
+                          .setStart_key(lastRow.key)
+                          .setEnd_token(split.getEndToken())
+                          .setRow_filter(filter);
                 startColumn = Iterables.getLast(lastRow.columns).column.name;
             }
 
-            KeyRange keyRange = new KeyRange(batchSize)
-                                .setStart_token(startToken)
-                                .setEnd_token(split.getEndToken())
-                                .setRow_filter(filter);
             try
             {
                 rows = client.get_paged_slice(cfName, keyRange, startColumn, consistencyLevel);
-
-                // nothing found?
-                if (rows == null || rows.isEmpty() || rows.get(0).columns.isEmpty())
-                {
-                    rows = null;
-                    return;
-                }
-
-                // nothing new? reached the end
-                if (lastRow != null && (rows.get(0).key.equals(lastRow.key) || rows.get(0).columns.get(0).column.name.equals(startColumn)))
-                {
+                int n = 0;
+                for (KeySlice row : rows)
+                    n += row.columns.size();
+                logger.debug("read {} columns in {} rows for {} starting with {}",
+                             new Object[]{ n, rows.size(), keyRange, startColumn });
+
+                wideColumns = Iterators.peekingIterator(new WideColumnIterator(rows));
+                if (wideColumns.hasNext() && wideColumns.peek().right.keySet().iterator().next().equals(startColumn))
+                    wideColumns.next();
+                if (!wideColumns.hasNext())
                     rows = null;
-                    return;
-                }
-
-                // prepare for the next slice to be read
-                lastRow = Iterables.getLast(rows);
-
-                // reset to iterate through this new batch
-                i = 0;
-                wideColumns = rows.get(i).columns.iterator();
             }
             catch (Exception e)
             {
@@ -487,9 +469,47 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
                 return endOfData();
 
             totalRead++;
-            ColumnOrSuperColumn cosc = wideColumns.next();
-            ImmutableSortedMap<ByteBuffer, IColumn> map = ImmutableSortedMap.of(cosc.column.name, unthriftify(cosc));
-            return Pair.<ByteBuffer, SortedMap<ByteBuffer, IColumn>>create(rows.get(i).key, map);
+            return wideColumns.next();
+        }
+
+        private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>>
+        {
+            private final Iterator<KeySlice> rows;
+            private Iterator<ColumnOrSuperColumn> columns;
+            public KeySlice currentRow;
+
+            public WideColumnIterator(List<KeySlice> rows)
+            {
+                this.rows = rows.iterator();
+                if (this.rows.hasNext())
+                    nextRow();
+                else
+                    columns = Iterators.emptyIterator();
+            }
+
+            private void nextRow()
+            {
+                currentRow = rows.next();
+                columns = currentRow.columns.iterator();
+            }
+
+            protected Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> computeNext()
+            {
+                while (true)
+                {
+                    if (columns.hasNext())
+                    {
+                        ColumnOrSuperColumn cosc = columns.next();
+                        ImmutableSortedMap<ByteBuffer, IColumn> map = ImmutableSortedMap.of(cosc.column.name, unthriftifySimple(cosc.column));
+                        return Pair.<ByteBuffer, SortedMap<ByteBuffer, IColumn>>create(currentRow.key, map);
+                    }
+
+                    if (!rows.hasNext())
+                        return endOfData();
+
+                    nextRow();
+                }
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/97aa922a/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 7aceb0e..7cb77d7 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -733,6 +733,7 @@ public class CassandraServer implements Cassandra.Iface
         AbstractBounds<RowPosition> bounds;
         if (range.start_key == null)
         {
+            // (token, key) is unsupported, assume (token, token)
             Token.TokenFactory tokenFactory = p.getTokenFactory();
             Token left = tokenFactory.fromString(range.start_token);
             Token right = tokenFactory.fromString(range.end_token);
@@ -740,7 +741,9 @@ public class CassandraServer implements Cassandra.Iface
         }
         else
         {
-            bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), RowPosition.forKey(range.end_key, p));
+            RowPosition end = range.end_key == null ? p.getTokenFactory().fromString(range.end_token).maxKeyBound(p)
+                                                    : RowPosition.forKey(range.end_key, p);
+            bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), end);
         }
 
         List<Row> rows;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/97aa922a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftValidation.java b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
index 25c751c..a77bfb6 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
@@ -479,20 +479,17 @@ public class ThriftValidation
 
     public static void validateKeyRange(CFMetaData metadata, ByteBuffer superColumn, KeyRange range) throws InvalidRequestException
     {
-        if ((range.start_key == null) != (range.end_key == null))
-        {
-            throw new InvalidRequestException("start key and end key must either both be non-null, or both be null");
-        }
-        if ((range.start_token == null) != (range.end_token == null))
-        {
-            throw new InvalidRequestException("start token and end token must either both be non-null, or both be null");
-        }
-        if ((range.start_key == null) == (range.start_token == null))
+        if ((range.start_key == null) == (range.start_token == null)
+            || (range.end_key == null) == (range.end_token == null))
         {
             throw new InvalidRequestException("exactly one of {start key, end key} or {start token, end token} must be specified");
         }
 
-        if (range.start_key != null)
+        // (key, token) is supported (for wide-row CFRR) but not (token, key)
+        if (range.start_token != null && range.end_key != null)
+            throw new InvalidRequestException("start token + end key is not a supported key range");
+
+        if (range.start_key != null && range.end_key != null)
         {
             IPartitioner p = StorageService.getPartitioner();
             Token startToken = p.getToken(range.start_key);
@@ -510,7 +507,7 @@ public class ThriftValidation
 
         if (!isEmpty(range.row_filter) && superColumn != null)
         {
-            throw new InvalidRequestException("super columns are not yet supported for indexing");
+            throw new InvalidRequestException("super columns are not supported for indexing");
         }
 
         if (range.count <= 0)