You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/11/10 19:37:37 UTC
svn commit: r1200471 - in /cassandra/branches/cassandra-0.8: CHANGES.txt
src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
Author: brandonwilliams
Date: Thu Nov 10 18:37:37 2011
New Revision: 1200471
URL: http://svn.apache.org/viewvc?rev=1200471&view=rev
Log:
Skip empty rows when entire row is requested, redux.
Patch by tjake, reviewed by brandonwilliams for CASSANDRA-2855
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1200471&r1=1200470&r2=1200471&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Thu Nov 10 18:37:37 2011
@@ -38,6 +38,7 @@
* fix displaying cfdef entries for super columnfamilies (CASSANDRA-3415)
* Make counter shard merging thread safe (CASSANDRA-3178)
* Fix bug preventing the use of efficient cross-DC writes (CASSANDRA-3472)
+ * (Hadoop) skip empty rows when entire row is requested, redux (CASSANDRA-2855)
0.8.7
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=1200471&r1=1200470&r2=1200471&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Thu Nov 10 18:37:37 2011
@@ -41,6 +41,7 @@ import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -55,6 +56,7 @@ public class ColumnFamilyRecordReader ex
private RowIterator iter;
private Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> currentRow;
private SlicePredicate predicate;
+ private boolean isEmptyPredicate;
private int totalRowCount; // total number of rows to fetch
private int batchRowCount; // fetch this many per batch
private String cfName;
@@ -89,11 +91,33 @@ public class ColumnFamilyRecordReader ex
return ((float)iter.rowsRead()) / totalRowCount;
}
+ static boolean isEmptyPredicate(SlicePredicate predicate)
+ {
+ if (predicate == null)
+ return true;
+
+ if (predicate.isSetColumn_names() && predicate.getSlice_range() == null)
+ return false;
+
+ if (predicate.getSlice_range() == null)
+ return true;
+
+ byte[] start = predicate.getSlice_range().getStart();
+ byte[] finish = predicate.getSlice_range().getFinish();
+ if ( (start == null || start == ArrayUtils.EMPTY_BYTE_ARRAY) &&
+ (finish == null || finish == ArrayUtils.EMPTY_BYTE_ARRAY) )
+ return true;
+
+
+ return false;
+ }
+
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
{
this.split = (ColumnFamilySplit) split;
Configuration conf = context.getConfiguration();
predicate = ConfigHelper.getInputSlicePredicate(conf);
+ isEmptyPredicate = isEmptyPredicate(predicate);
totalRowCount = ConfigHelper.getInputSplitSize(conf);
batchRowCount = ConfigHelper.getRangeBatchSize(conf);
cfName = ConfigHelper.getInputColumnFamily(conf);
@@ -237,6 +261,7 @@ public class ColumnFamilyRecordReader ex
}
else if (startToken.equals(split.getEndToken()))
{
+ // reached end of the split
rows = null;
return;
}
@@ -257,14 +282,37 @@ public class ColumnFamilyRecordReader ex
rows = null;
return;
}
-
- // reset to iterate through this new batch
- i = 0;
// prepare for the next slice to be read
KeySlice lastRow = rows.get(rows.size() - 1);
ByteBuffer rowkey = lastRow.key;
startToken = partitioner.getTokenFactory().toString(partitioner.getToken(rowkey));
+
+ // remove ghosts when fetching all columns
+ if (isEmptyPredicate)
+ {
+ Iterator<KeySlice> it = rows.iterator();
+
+ while(it.hasNext())
+ {
+ KeySlice ks = it.next();
+
+ if (ks.getColumnsSize() == 0)
+ {
+ it.remove();
+ }
+ }
+
+ // all ghosts, spooky
+ if (rows.isEmpty())
+ {
+ maybeInit();
+ return;
+ }
+ }
+
+ // reset to iterate through this new batch
+ i = 0;
}
catch (Exception e)
{