You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/01/23 23:04:52 UTC
[6/9] git commit: propagate range filter to ColumnFamilyRecordReader
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-2878
propagate range filter to ColumnFamilyRecordReader
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-2878
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d51d6f0b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d51d6f0b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d51d6f0b
Branch: refs/heads/trunk
Commit: d51d6f0b43b006b40ec3c1d6d13e2155b58136c0
Parents: b08c675
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Jan 20 14:05:07 2012 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Jan 23 16:00:44 2012 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 1 +
.../org/apache/cassandra/db/ColumnFamilyStore.java | 2 +
.../cassandra/hadoop/ColumnFamilyInputFormat.java | 2 +-
.../cassandra/hadoop/ColumnFamilyRecordReader.java | 6 +++-
.../org/apache/cassandra/hadoop/ConfigHelper.java | 25 ++++++++++++--
.../apache/cassandra/thrift/ThriftValidation.java | 7 ----
7 files changed, 31 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4a8ee42..1e51f54 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@
* Use faster bytes comparison (CASSANDRA-3434)
* Bulk loader is no longer a fat client, (HADOOP) bulk load output format
(CASSANDRA-3045)
+ * (Hadoop) add support for KeyRange.filter
* remove assumption that keys and token are in bijection
(CASSANDRA-1034, 3574, 3604)
* always remove endpoints from delevery queue in HH (CASSANDRA-3546)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index e685b12..0b227f3 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -55,6 +55,7 @@ Features
be pinned to specfic media.
- Hadoop: a new BulkOutputFormat is included which will directly write
SSTables locally and then stream them into the cluster.
+ - Hadoop: KeyRange.filter is now supported with ColumnFamilyInputFormat
- The bulk loader is not longer a fat client; it can be run from an
existing machine in a cluster.
- A new write survey mode has been added, similar to bootstrap (enabled via
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 98223f7..7ff46c9 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1304,6 +1304,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public List<Row> filter(AbstractScanIterator rowIterator, ExtendedFilter filter)
{
+ if (logger.isDebugEnabled())
+ logger.debug("Filtering {} for rows matching {}", rowIterator, filter);
List<Row> rows = new ArrayList<Row>();
int columnsCount = 0;
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index c13e881..8abc460 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -124,7 +124,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
IPartitioner partitioner = null;
Range<Token> jobRange = null;
- if (jobKeyRange != null)
+ if (jobKeyRange != null && jobKeyRange.start_token != null)
{
partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration());
assert partitioner.preservesOrder() : "ConfigHelper.setInputKeyRange(..) can only be used with a order preserving paritioner";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index cbe2b3c..46b767a 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -68,6 +68,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
private Cassandra.Client client;
private ConsistencyLevel consistencyLevel;
private int keyBufferSize = 8192;
+ private List<IndexExpression> filter;
public ColumnFamilyRecordReader()
{
@@ -131,6 +132,8 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
{
this.split = (ColumnFamilySplit) split;
Configuration conf = context.getConfiguration();
+ KeyRange jobRange = ConfigHelper.getInputKeyRange(conf);
+ filter = jobRange == null ? null : jobRange.row_filter;
predicate = ConfigHelper.getInputSlicePredicate(conf);
isEmptyPredicate = isEmptyPredicate(predicate);
totalRowCount = ConfigHelper.getInputSplitSize(conf);
@@ -283,7 +286,8 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
KeyRange keyRange = new KeyRange(batchRowCount)
.setStart_token(startToken)
- .setEnd_token(split.getEndToken());
+ .setEnd_token(split.getEndToken())
+ .setRow_filter(filter);
try
{
rows = client.get_range_slices(new ColumnParent(cfName),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index 63eec8c..9d89a79 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -25,10 +25,7 @@ import java.util.List;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.KeyRange;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.TBinaryProtocol;
+import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Hex;
import org.apache.hadoop.conf.Configuration;
@@ -225,6 +222,26 @@ public class ConfigHelper
conf.set(INPUT_KEYRANGE_CONFIG, thriftToString(range));
}
+ /**
+ * Set the KeyRange to limit the rows.
+ * @param conf Job configuration you are about to run
+ */
+ public static void setInputRange(Configuration conf, String startToken, String endToken, List<IndexExpression> filter)
+ {
+ KeyRange range = new KeyRange().setStart_token(startToken).setEnd_token(endToken).setRow_filter(filter);
+ conf.set(INPUT_KEYRANGE_CONFIG, thriftToString(range));
+ }
+
+ /**
+ * Set the KeyRange to limit the rows.
+ * @param conf Job configuration you are about to run
+ */
+ public static void setInputRange(Configuration conf, List<IndexExpression> filter)
+ {
+ KeyRange range = new KeyRange().setRow_filter(filter);
+ conf.set(INPUT_KEYRANGE_CONFIG, thriftToString(range));
+ }
+
/** may be null if unset */
public static KeyRange getInputKeyRange(Configuration conf)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftValidation.java b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
index 31035e3..b1ab665 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
@@ -514,13 +514,6 @@ public class ThriftValidation
{
throw new InvalidRequestException("super columns are not yet supported for indexing");
}
- if (!isEmpty(range.row_filter) && range.start_key == null)
- {
- // TODO: our current KEYS indexes can't do that efficiently
- // (without scanning *all* the keys in the range and simply applying the filter to discard them when they don't match)
- // See KeySearcher.search()
- throw new InvalidRequestException("filtered queries must use concrete keys rather than tokens");
- }
if (range.count <= 0)
{