You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/04/11 20:26:43 UTC
[6/21] git commit: update get_paged_slice to allow starting with a
key; fixes for WideRowIterator patch by jbellis;
reviewed by brandonwilliams for CASSANDRA-3883
update get_paged_slice to allow starting with a key; fixes for WideRowIterator
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3883
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/97aa922a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/97aa922a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/97aa922a
Branch: refs/heads/cassandra-1.1.0
Commit: 97aa922a7476dce06121ae289877abccf161afae
Parents: dbc0f59
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Apr 10 16:06:39 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Apr 11 13:24:56 2012 -0500
----------------------------------------------------------------------
.../cassandra/hadoop/ColumnFamilyInputFormat.java | 25 ++-
.../cassandra/hadoop/ColumnFamilyRecordReader.java | 136 ++++++++------
.../apache/cassandra/thrift/CassandraServer.java | 5 +-
.../apache/cassandra/thrift/ThriftValidation.java | 19 +--
4 files changed, 108 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/97aa922a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index ef56678..354903d 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -34,6 +34,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import com.google.common.collect.ImmutableList;
+
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
@@ -87,6 +89,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
private String keyspace;
private String cfName;
+ private IPartitioner partitioner;
private static void validateConfiguration(Configuration conf)
{
@@ -100,8 +103,8 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
}
if (ConfigHelper.getInputInitialAddress(conf) == null)
throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node");
-
- // input partitioner is optional -- used only if requesting an ordered key scan
+ if (ConfigHelper.getInputPartitioner(conf) == null)
+ throw new UnsupportedOperationException("You must set the Cassandra partitioner class");
}
public List<InputSplit> getSplits(JobContext context) throws IOException
@@ -115,6 +118,8 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
keyspace = ConfigHelper.getInputKeyspace(context.getConfiguration());
cfName = ConfigHelper.getInputColumnFamily(context.getConfiguration());
+ partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration());
+ logger.debug("partitioner is " + partitioner);
// cannonical ranges, split into pieces, fetching the splits in parallel
ExecutorService executor = Executors.newCachedThreadPool();
@@ -124,11 +129,9 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
{
List<Future<List<InputSplit>>> splitfutures = new ArrayList<Future<List<InputSplit>>>();
KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
- IPartitioner partitioner = null;
Range<Token> jobRange = null;
if (jobKeyRange != null && jobKeyRange.start_token != null)
{
- partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration());
assert partitioner.preservesOrder() : "ConfigHelper.setInputKeyRange(..) can only be used with a order preserving paritioner";
assert jobKeyRange.start_key == null : "only start_token supported";
assert jobKeyRange.end_key == null : "only end_token supported";
@@ -219,11 +222,19 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
endpoints[endpointIndex++] = InetAddress.getByName(endpoint_address).getHostName();
}
+ Token.TokenFactory factory = partitioner.getTokenFactory();
for (int i = 1; i < tokens.size(); i++)
{
- ColumnFamilySplit split = new ColumnFamilySplit(tokens.get(i - 1), tokens.get(i), endpoints);
- logger.debug("adding " + split);
- splits.add(split);
+ Token left = factory.fromString(tokens.get(i - 1));
+ Token right = factory.fromString(tokens.get(i));
+ Range<Token> range = new Range<Token>(left, right, partitioner);
+ List<Range<Token>> ranges = range.isWrapAround() ? range.unwrap() : ImmutableList.of(range);
+ for (Range<Token> subrange : ranges)
+ {
+ ColumnFamilySplit split = new ColumnFamilySplit(factory.toString(subrange.left), factory.toString(subrange.right), endpoints);
+ logger.debug("adding " + split);
+ splits.add(split);
+ }
}
return splits;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/97aa922a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index 483c040..600cf13 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -29,10 +29,9 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.*;
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Iterables;
-import org.apache.commons.lang.ArrayUtils;
+import com.google.common.collect.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.config.ConfigurationException;
@@ -55,6 +54,8 @@ import org.apache.thrift.transport.TSocket;
public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
{
+ private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class);
+
public static final int CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192;
private ColumnFamilySplit split;
@@ -179,6 +180,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
}
iter = widerows ? new WideRowIterator() : new StaticRowIterator();
+ logger.debug("created {}", iter);
}
public boolean nextKeyValue() throws IOException
@@ -230,9 +232,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>>
{
protected List<KeySlice> rows;
- protected KeySlice lastRow;
protected int totalRead = 0;
- protected int i = 0;
protected final AbstractType<?> comparator;
protected final AbstractType<?> subComparator;
protected final IPartitioner partitioner;
@@ -299,7 +299,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
return sc;
}
- private IColumn unthriftifySimple(Column column)
+ protected IColumn unthriftifySimple(Column column)
{
return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp);
}
@@ -322,23 +322,23 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
private class StaticRowIterator extends RowIterator
{
+ protected int i = 0;
+
private void maybeInit()
{
// check if we need another batch
- if (rows != null && i >= rows.size())
- rows = null;
-
- if (rows != null)
+ if (rows != null && i < rows.size())
return;
String startToken;
- if (lastRow == null)
+ if (totalRead == 0)
{
+ // first request
startToken = split.getStartToken();
}
else
{
- startToken = partitioner.getTokenFactory().toString(partitioner.getToken(lastRow.key));
+ startToken = partitioner.getTokenFactory().toString(partitioner.getToken(Iterables.getLast(rows).key));
if (startToken.equals(split.getEndToken()))
{
// reached end of the split
@@ -362,9 +362,6 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
return;
}
- // prepare for the next slice to be read
- lastRow = Iterables.getLast(rows);
-
// remove ghosts when fetching all columns
if (isEmptyPredicate)
{
@@ -415,64 +412,49 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
private class WideRowIterator extends RowIterator
{
- private Iterator<ColumnOrSuperColumn> wideColumns;
+ private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>> wideColumns;
private void maybeInit()
{
if (wideColumns != null && wideColumns.hasNext())
return;
- // check if we need another batch
- if (rows != null && ++i >= rows.size())
- rows = null;
-
- if (rows != null)
- {
- wideColumns = rows.get(i).columns.iterator();
- return;
- }
-
- String startToken;
+ KeyRange keyRange;
ByteBuffer startColumn;
- if (lastRow == null)
+ if (totalRead == 0)
{
- startToken = split.getStartToken();
+ String startToken = split.getStartToken();
+ keyRange = new KeyRange(batchSize)
+ .setStart_token(startToken)
+ .setEnd_token(split.getEndToken())
+ .setRow_filter(filter);
startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
}
else
{
- startToken = partitioner.getTokenFactory().toString(partitioner.getToken(lastRow.key));
+ KeySlice lastRow = Iterables.getLast(rows);
+ logger.debug("Starting with last-seen row {}", lastRow.key);
+ keyRange = new KeyRange(batchSize)
+ .setStart_key(lastRow.key)
+ .setEnd_token(split.getEndToken())
+ .setRow_filter(filter);
startColumn = Iterables.getLast(lastRow.columns).column.name;
}
- KeyRange keyRange = new KeyRange(batchSize)
- .setStart_token(startToken)
- .setEnd_token(split.getEndToken())
- .setRow_filter(filter);
try
{
rows = client.get_paged_slice(cfName, keyRange, startColumn, consistencyLevel);
-
- // nothing found?
- if (rows == null || rows.isEmpty() || rows.get(0).columns.isEmpty())
- {
- rows = null;
- return;
- }
-
- // nothing new? reached the end
- if (lastRow != null && (rows.get(0).key.equals(lastRow.key) || rows.get(0).columns.get(0).column.name.equals(startColumn)))
- {
+ int n = 0;
+ for (KeySlice row : rows)
+ n += row.columns.size();
+ logger.debug("read {} columns in {} rows for {} starting with {}",
+ new Object[]{ n, rows.size(), keyRange, startColumn });
+
+ wideColumns = Iterators.peekingIterator(new WideColumnIterator(rows));
+ if (wideColumns.hasNext() && wideColumns.peek().right.keySet().iterator().next().equals(startColumn))
+ wideColumns.next();
+ if (!wideColumns.hasNext())
rows = null;
- return;
- }
-
- // prepare for the next slice to be read
- lastRow = Iterables.getLast(rows);
-
- // reset to iterate through this new batch
- i = 0;
- wideColumns = rows.get(i).columns.iterator();
}
catch (Exception e)
{
@@ -487,9 +469,47 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
return endOfData();
totalRead++;
- ColumnOrSuperColumn cosc = wideColumns.next();
- ImmutableSortedMap<ByteBuffer, IColumn> map = ImmutableSortedMap.of(cosc.column.name, unthriftify(cosc));
- return Pair.<ByteBuffer, SortedMap<ByteBuffer, IColumn>>create(rows.get(i).key, map);
+ return wideColumns.next();
+ }
+
+ private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>>
+ {
+ private final Iterator<KeySlice> rows;
+ private Iterator<ColumnOrSuperColumn> columns;
+ public KeySlice currentRow;
+
+ public WideColumnIterator(List<KeySlice> rows)
+ {
+ this.rows = rows.iterator();
+ if (this.rows.hasNext())
+ nextRow();
+ else
+ columns = Iterators.emptyIterator();
+ }
+
+ private void nextRow()
+ {
+ currentRow = rows.next();
+ columns = currentRow.columns.iterator();
+ }
+
+ protected Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> computeNext()
+ {
+ while (true)
+ {
+ if (columns.hasNext())
+ {
+ ColumnOrSuperColumn cosc = columns.next();
+ ImmutableSortedMap<ByteBuffer, IColumn> map = ImmutableSortedMap.of(cosc.column.name, unthriftifySimple(cosc.column));
+ return Pair.<ByteBuffer, SortedMap<ByteBuffer, IColumn>>create(currentRow.key, map);
+ }
+
+ if (!rows.hasNext())
+ return endOfData();
+
+ nextRow();
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/97aa922a/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 7aceb0e..7cb77d7 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -733,6 +733,7 @@ public class CassandraServer implements Cassandra.Iface
AbstractBounds<RowPosition> bounds;
if (range.start_key == null)
{
+ // (token, key) is unsupported, assume (token, token)
Token.TokenFactory tokenFactory = p.getTokenFactory();
Token left = tokenFactory.fromString(range.start_token);
Token right = tokenFactory.fromString(range.end_token);
@@ -740,7 +741,9 @@ public class CassandraServer implements Cassandra.Iface
}
else
{
- bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), RowPosition.forKey(range.end_key, p));
+ RowPosition end = range.end_key == null ? p.getTokenFactory().fromString(range.end_token).maxKeyBound(p)
+ : RowPosition.forKey(range.end_key, p);
+ bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), end);
}
List<Row> rows;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/97aa922a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftValidation.java b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
index 25c751c..a77bfb6 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
@@ -479,20 +479,17 @@ public class ThriftValidation
public static void validateKeyRange(CFMetaData metadata, ByteBuffer superColumn, KeyRange range) throws InvalidRequestException
{
- if ((range.start_key == null) != (range.end_key == null))
- {
- throw new InvalidRequestException("start key and end key must either both be non-null, or both be null");
- }
- if ((range.start_token == null) != (range.end_token == null))
- {
- throw new InvalidRequestException("start token and end token must either both be non-null, or both be null");
- }
- if ((range.start_key == null) == (range.start_token == null))
+ if ((range.start_key == null) == (range.start_token == null)
+ || (range.end_key == null) == (range.end_token == null))
{
throw new InvalidRequestException("exactly one of {start key, end key} or {start token, end token} must be specified");
}
- if (range.start_key != null)
+ // (key, token) is supported (for wide-row CFRR) but not (token, key)
+ if (range.start_token != null && range.end_key != null)
+ throw new InvalidRequestException("start token + end key is not a supported key range");
+
+ if (range.start_key != null && range.end_key != null)
{
IPartitioner p = StorageService.getPartitioner();
Token startToken = p.getToken(range.start_key);
@@ -510,7 +507,7 @@ public class ThriftValidation
if (!isEmpty(range.row_filter) && superColumn != null)
{
- throw new InvalidRequestException("super columns are not yet supported for indexing");
+ throw new InvalidRequestException("super columns are not supported for indexing");
}
if (range.count <= 0)