You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/19 21:52:26 UTC

svn commit: r965604 - in /cassandra/branches/cassandra-0.6: CHANGES.txt src/java/org/apache/cassandra/db/ColumnFamilyStore.java src/java/org/apache/cassandra/dht/Range.java src/java/org/apache/cassandra/service/StorageProxy.java test/system/test_server.py

Author: jbellis
Date: Mon Jul 19 19:52:26 2010
New Revision: 965604

URL: http://svn.apache.org/viewvc?rev=965604&view=rev
Log:
fix duplicate rows being read during mapreduce.  patch by jbellis; reviewed by Jeremy Hanna for CASSANDRA-1042

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/dht/Range.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/branches/cassandra-0.6/test/system/test_server.py

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=965604&r1=965603&r2=965604&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Mon Jul 19 19:52:26 2010
@@ -12,6 +12,7 @@
  * log errors in gossip instead of re-throwing (CASSANDRA-1289)
  * avoid aborting commitlog replay prematurely if a flushed-but-
    not-removed commitlog segment is encountered (CASSANDRA-1297)
+ * fix duplicate rows being read during mapreduce (CASSANDRA-1142)
 
 
 0.6.3

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=965604&r1=965603&r2=965604&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Jul 19 19:52:26 2010
@@ -895,7 +895,7 @@ public class ColumnFamilyStore implement
        range_slice.  still opens one randomaccessfile per key, which sucks.  something like compactioniterator
        would be better.
      */
-    private boolean getKeyRange(List<String> keys, final AbstractBounds range, int maxResults)
+    private void getKeyRange(List<String> keys, final AbstractBounds range, int maxResults)
     throws IOException, ExecutionException, InterruptedException
     {
         final DecoratedKey startWith = new DecoratedKey(range.left, null);
@@ -974,21 +974,22 @@ public class ColumnFamilyStore implement
             {
                 if (!stopAt.isEmpty() && stopAt.compareTo(current) < 0)
                 {
-                    return true;
+                    return;
                 }
 
                 if (range instanceof Bounds || !first || !current.equals(startWith))
                 {
+                    if (logger_.isDebugEnabled())
+                        logger_.debug("scanned " + current.key + " with token of " + StorageService.getPartitioner().getToken(current.key));
                     keys.add(current.key);
                 }
                 first = false;
 
                 if (keys.size() >= maxResults)
                 {
-                    return true;
+                    return;
                 }
             }
-            return false;
         }
         finally
         {
@@ -1017,23 +1018,10 @@ public class ColumnFamilyStore implement
     throws IOException, ExecutionException, InterruptedException
     {
         List<String> keys = new ArrayList<String>();
-        boolean completed;
-        if ((range instanceof Bounds || !((Range)range).isWrapAround()))
-        {
-            completed = getKeyRange(keys, range, keyMax);
-        }
-        else
-        {
-            // wrapped range
-            Token min = StorageService.getPartitioner().getMinimumToken();
-            Range first = new Range(range.left, min);
-            completed = getKeyRange(keys, first, keyMax);
-            if (!completed && min.compareTo(range.right) < 0)
-            {
-                Range second = new Range(min, range.right);
-                getKeyRange(keys, second, keyMax);
-            }
-        }
+        assert range instanceof Bounds
+               || (!((Range)range).isWrapAround() || range.right.equals(StorageService.getPartitioner().getMinimumToken()))
+               : range;
+        getKeyRange(keys, range, keyMax);
         List<Row> rows = new ArrayList<Row>(keys.size());
         final QueryPath queryPath =  new QueryPath(columnFamily_, super_column, null);
         final SortedSet<byte[]> columnNameSet = new TreeSet<byte[]>(getComparator());

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/dht/Range.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/dht/Range.java?rev=965604&r1=965603&r2=965604&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/dht/Range.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/dht/Range.java Mon Jul 19 19:52:26 2010
@@ -131,10 +131,10 @@ public class Range extends AbstractBound
      */
     public Set<Range> intersectionWith(Range that)
     {
-        if (this.contains(that))
-            return rangeSet(that);
         if (that.contains(this))
             return rangeSet(this);
+        if (this.contains(that))
+            return rangeSet(that);
 
         boolean thiswraps = isWrapAround(left, right);
         boolean thatwraps = isWrapAround(that.left, that.right);

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=965604&r1=965603&r2=965604&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java Mon Jul 19 19:52:26 2010
@@ -659,10 +659,13 @@ public class StorageProxy implements Sto
      *     D, but we don't want any other results from it until after the (D, T] range.  Unwrapping so that
      *     the ranges we consider are (D, T], (T, MIN], (MIN, D] fixes this.
      */
-    private static List<AbstractBounds> getRestrictedRanges(AbstractBounds queryRange)
+    private static List<AbstractBounds> getRestrictedRanges(final AbstractBounds queryRange)
     {
         TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata();
 
+        if (logger.isDebugEnabled())
+            logger.debug("computing restricted ranges for query " + queryRange);
+
         List<AbstractBounds> ranges = new ArrayList<AbstractBounds>();
         // for each node, compute its intersection with the query range, and add its unwrapped components to our list
         for (Token nodeToken : tokenMetadata.sortedTokens())
@@ -682,14 +685,23 @@ public class StorageProxy implements Sto
         // re-sort ranges in ring order, post-unwrapping
         Comparator<AbstractBounds> comparator = new Comparator<AbstractBounds>()
         {
+            // no restricted ranges will overlap so we don't need to worry about inclusive vs exclusive left,
+            // just sort by raw token position.
             public int compare(AbstractBounds o1, AbstractBounds o2)
             {
-                // no restricted ranges will overlap so we don't need to worry about inclusive vs exclusive left,
-                // just sort by raw token position.
-                return o1.left.compareTo(o2.left);
+                // sort in order that the original query range would see them.
+                int queryOrder1 = queryRange.left.compareTo(o1.left);
+                int queryOrder2 = queryRange.left.compareTo(o2.left);
+                if (queryOrder1 < queryOrder2)
+                    return -1; // o1 comes after query start, o2 wraps to after
+                if (queryOrder1 > queryOrder2)
+                    return 1; // o2 comes after query start, o1 wraps to after
+                return o1.left.compareTo(o2.left); // o1 and o2 are on the same side of query start
             }
         };
         Collections.sort(ranges, comparator);
+        if (logger.isDebugEnabled())
+            logger.debug("Sorted ranges are [" + StringUtils.join(ranges, ", ") + "]");
 
         return ranges;
     }

Modified: cassandra/branches/cassandra-0.6/test/system/test_server.py
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/system/test_server.py?rev=965604&r1=965603&r2=965604&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/test/system/test_server.py (original)
+++ cassandra/branches/cassandra-0.6/test/system/test_server.py Mon Jul 19 19:52:26 2010
@@ -906,6 +906,26 @@ class TestMutations(CassandraTester):
         assert result[1].columns[0].column.name == 'col1'
         
     
+    def test_wrapped_range_slices(self):
+        def copp_token(key):
+            # I cheated and generated this from Java
+            return {'a': '00530000000100000001', 
+                    'b': '00540000000100000001', 
+                    'c': '00550000000100000001',
+                    'd': '00560000000100000001', 
+                    'e': '00580000000100000001'}[key]
+        for key in ['a', 'b', 'c', 'd', 'e']:
+            for cname in ['col1', 'col2', 'col3', 'col4', 'col5']:
+                client.insert('Keyspace1', key, ColumnPath('Standard1', column=cname), 'v-' + cname, 0, ConsistencyLevel.ONE)
+        cp = ColumnParent('Standard1')
+
+        result = client.get_range_slices("Keyspace1", cp, SlicePredicate(column_names=['col1', 'col3']), KeyRange(start_token=copp_token('e'), end_token=copp_token('e')), ConsistencyLevel.ONE)
+        assert [row.key for row in result] == ['a', 'b', 'c', 'd', 'e',], [row.key for row in result]
+
+        result = client.get_range_slices("Keyspace1", cp, SlicePredicate(column_names=['col1', 'col3']), KeyRange(start_token=copp_token('c'), end_token=copp_token('c')), ConsistencyLevel.ONE)
+        assert [row.key for row in result] == ['d', 'e', 'a', 'b', 'c',], [row.key for row in result]
+        
+
     def test_get_slice_by_names(self):
         _insert_range()
         p = SlicePredicate(column_names=['c1', 'c2'])