You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/11/10 19:37:37 UTC

svn commit: r1200471 - in /cassandra/branches/cassandra-0.8: CHANGES.txt src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java

Author: brandonwilliams
Date: Thu Nov 10 18:37:37 2011
New Revision: 1200471

URL: http://svn.apache.org/viewvc?rev=1200471&view=rev
Log:
Skip empty rows when entire row is requested, redux.
Patch by tjake, reviewed by brandonwilliams for CASSANDRA-2855

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1200471&r1=1200470&r2=1200471&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Thu Nov 10 18:37:37 2011
@@ -38,6 +38,7 @@
  * fix displaying cfdef entries for super columnfamilies (CASSANDRA-3415)
  * Make counter shard merging thread safe (CASSANDRA-3178)
  * Fix bug preventing the use of efficient cross-DC writes (CASSANDRA-3472)
+ * (Hadoop) skip empty rows when entire row is requested, redux (CASSANDRA-2855)
 
 
 0.8.7

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=1200471&r1=1200470&r2=1200471&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Thu Nov 10 18:37:37 2011
@@ -41,6 +41,7 @@ import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -55,6 +56,7 @@ public class ColumnFamilyRecordReader ex
     private RowIterator iter;
     private Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> currentRow;
     private SlicePredicate predicate;
+    private boolean isEmptyPredicate;
     private int totalRowCount; // total number of rows to fetch
     private int batchRowCount; // fetch this many per batch
     private String cfName;
@@ -89,11 +91,33 @@ public class ColumnFamilyRecordReader ex
         return ((float)iter.rowsRead()) / totalRowCount;
     }
     
+    static boolean isEmptyPredicate(SlicePredicate predicate)
+    {
+        if (predicate == null)
+            return true;
+              
+        if (predicate.isSetColumn_names() && predicate.getSlice_range() == null)
+            return false;
+        
+        if (predicate.getSlice_range() == null)
+            return true;
+        
+        byte[] start  = predicate.getSlice_range().getStart();
+        byte[] finish = predicate.getSlice_range().getFinish(); 
+        if ( (start == null || start == ArrayUtils.EMPTY_BYTE_ARRAY) &&
+             (finish == null || finish == ArrayUtils.EMPTY_BYTE_ARRAY) )
+            return true;
+        
+        
+        return false;       
+    }
+    
     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
     {
         this.split = (ColumnFamilySplit) split;
         Configuration conf = context.getConfiguration();
         predicate = ConfigHelper.getInputSlicePredicate(conf);
+        isEmptyPredicate = isEmptyPredicate(predicate);
         totalRowCount = ConfigHelper.getInputSplitSize(conf);
         batchRowCount = ConfigHelper.getRangeBatchSize(conf);
         cfName = ConfigHelper.getInputColumnFamily(conf);
@@ -237,6 +261,7 @@ public class ColumnFamilyRecordReader ex
             } 
             else if (startToken.equals(split.getEndToken()))
             {
+                // reached end of the split
                 rows = null;
                 return;
             }
@@ -257,14 +282,37 @@ public class ColumnFamilyRecordReader ex
                     rows = null;
                     return;
                 }
-                               
-                // reset to iterate through this new batch
-                i = 0;
                 
                 // prepare for the next slice to be read
                 KeySlice lastRow = rows.get(rows.size() - 1);
                 ByteBuffer rowkey = lastRow.key;
                 startToken = partitioner.getTokenFactory().toString(partitioner.getToken(rowkey));
+                
+                // remove ghosts when fetching all columns
+                if (isEmptyPredicate)
+                {
+                    Iterator<KeySlice> it = rows.iterator();
+                    
+                    while(it.hasNext())
+                    {
+                        KeySlice ks = it.next();
+                        
+                        if (ks.getColumnsSize() == 0)
+                        {
+                           it.remove();
+                        }
+                    }
+                
+                    // all ghosts, spooky
+                    if (rows.isEmpty())
+                    {
+                        maybeInit();
+                        return;
+                    }
+                }
+                
+                // reset to iterate through this new batch
+                i = 0;             
             }
             catch (Exception e)
             {