You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/01/23 23:04:52 UTC

[6/9] git commit: propagate range filter to ColumnFamilyRecordReader patch by jbellis; reviewed by brandonwilliams for CASSANDRA-2878

propagate range filter to ColumnFamilyRecordReader
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-2878


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d51d6f0b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d51d6f0b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d51d6f0b

Branch: refs/heads/trunk
Commit: d51d6f0b43b006b40ec3c1d6d13e2155b58136c0
Parents: b08c675
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Jan 20 14:05:07 2012 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Jan 23 16:00:44 2012 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 NEWS.txt                                           |    1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |    2 +
 .../cassandra/hadoop/ColumnFamilyInputFormat.java  |    2 +-
 .../cassandra/hadoop/ColumnFamilyRecordReader.java |    6 +++-
 .../org/apache/cassandra/hadoop/ConfigHelper.java  |   25 ++++++++++++--
 .../apache/cassandra/thrift/ThriftValidation.java  |    7 ----
 7 files changed, 31 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4a8ee42..1e51f54 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@
  * Use faster bytes comparison (CASSANDRA-3434)
  * Bulk loader is no longer a fat client, (HADOOP) bulk load output format
    (CASSANDRA-3045)
+ * (Hadoop) add support for KeyRange.filter
  * remove assumption that keys and token are in bijection
    (CASSANDRA-1034, 3574, 3604)
  * always remove endpoints from delevery queue in HH (CASSANDRA-3546)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index e685b12..0b227f3 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -55,6 +55,7 @@ Features
       be pinned to specfic media.
     - Hadoop: a new BulkOutputFormat is included which will directly write
       SSTables locally and then stream them into the cluster.
+    - Hadoop: KeyRange.filter is now supported with ColumnFamilyInputFormat
     - The bulk loader is not longer a fat client; it can be run from an
       existing machine in a cluster.
     - A new write survey mode has been added, similar to bootstrap (enabled via

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 98223f7..7ff46c9 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1304,6 +1304,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public List<Row> filter(AbstractScanIterator rowIterator, ExtendedFilter filter)
     {
+        if (logger.isDebugEnabled())
+            logger.debug("Filtering {} for rows matching {}", rowIterator, filter);
          List<Row> rows = new ArrayList<Row>();
          int columnsCount = 0;
          try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index c13e881..8abc460 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -124,7 +124,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
             KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
             IPartitioner partitioner = null;
             Range<Token> jobRange = null;
-            if (jobKeyRange != null)
+            if (jobKeyRange != null && jobKeyRange.start_token != null)
             {
                 partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration());
                 assert partitioner.preservesOrder() : "ConfigHelper.setInputKeyRange(..) can only be used with a order preserving paritioner";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index cbe2b3c..46b767a 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -68,6 +68,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
     private Cassandra.Client client;
     private ConsistencyLevel consistencyLevel;
     private int keyBufferSize = 8192;
+    private List<IndexExpression> filter;
 
     public ColumnFamilyRecordReader()
     {
@@ -131,6 +132,8 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
     {
         this.split = (ColumnFamilySplit) split;
         Configuration conf = context.getConfiguration();
+        KeyRange jobRange = ConfigHelper.getInputKeyRange(conf);
+        filter = jobRange == null ? null : jobRange.row_filter;
         predicate = ConfigHelper.getInputSlicePredicate(conf);
         isEmptyPredicate = isEmptyPredicate(predicate);
         totalRowCount = ConfigHelper.getInputSplitSize(conf);
@@ -283,7 +286,8 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
             
             KeyRange keyRange = new KeyRange(batchRowCount)
                                 .setStart_token(startToken)
-                                .setEnd_token(split.getEndToken());
+                                .setEnd_token(split.getEndToken())
+                                .setRow_filter(filter);
             try
             {
                 rows = client.get_range_slices(new ColumnParent(cfName),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index 63eec8c..9d89a79 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -25,10 +25,7 @@ import java.util.List;
 
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.KeyRange;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.TBinaryProtocol;
+import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Hex;
 import org.apache.hadoop.conf.Configuration;
@@ -225,6 +222,26 @@ public class ConfigHelper
         conf.set(INPUT_KEYRANGE_CONFIG, thriftToString(range));
     }
 
+    /**
+     * Set the KeyRange to limit the rows.
+     * @param conf Job configuration you are about to run
+     */
+    public static void setInputRange(Configuration conf, String startToken, String endToken, List<IndexExpression> filter)
+    {
+        KeyRange range = new KeyRange().setStart_token(startToken).setEnd_token(endToken).setRow_filter(filter);
+        conf.set(INPUT_KEYRANGE_CONFIG, thriftToString(range));
+    }
+
+    /**
+     * Set the KeyRange to limit the rows.
+     * @param conf Job configuration you are about to run
+     */
+    public static void setInputRange(Configuration conf, List<IndexExpression> filter)
+    {
+        KeyRange range = new KeyRange().setRow_filter(filter);
+        conf.set(INPUT_KEYRANGE_CONFIG, thriftToString(range));
+    }
+
     /** may be null if unset */
     public static KeyRange getInputKeyRange(Configuration conf)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftValidation.java b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
index 31035e3..b1ab665 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
@@ -514,13 +514,6 @@ public class ThriftValidation
         {
             throw new InvalidRequestException("super columns are not yet supported for indexing");
         }
-        if (!isEmpty(range.row_filter) && range.start_key == null)
-        {
-            // TODO: our current KEYS indexes can't do that efficiently
-            // (without scanning *all* the keys in the range and simply applying the filter to discard them when they don't match)
-            // See KeySearcher.search()
-            throw new InvalidRequestException("filtered queries must use concrete keys rather than tokens");
-        }
 
         if (range.count <= 0)
         {