You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/19 21:52:26 UTC
svn commit: r965604 - in /cassandra/branches/cassandra-0.6: CHANGES.txt
src/java/org/apache/cassandra/db/ColumnFamilyStore.java
src/java/org/apache/cassandra/dht/Range.java
src/java/org/apache/cassandra/service/StorageProxy.java
test/system/test_server.py
Author: jbellis
Date: Mon Jul 19 19:52:26 2010
New Revision: 965604
URL: http://svn.apache.org/viewvc?rev=965604&view=rev
Log:
fix duplicate rows being read during mapreduce. patch by jbellis; reviewed by Jeremy Hanna for CASSANDRA-1042
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/dht/Range.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-0.6/test/system/test_server.py
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=965604&r1=965603&r2=965604&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Mon Jul 19 19:52:26 2010
@@ -12,6 +12,7 @@
* log errors in gossip instead of re-throwing (CASSANDRA-1289)
* avoid aborting commitlog replay prematurely if a flushed-but-
not-removed commitlog segment is encountered (CASSANDRA-1297)
+ * fix duplicate rows being read during mapreduce (CASSANDRA-1142)
0.6.3
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=965604&r1=965603&r2=965604&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Jul 19 19:52:26 2010
@@ -895,7 +895,7 @@ public class ColumnFamilyStore implement
range_slice. still opens one randomaccessfile per key, which sucks. something like compactioniterator
would be better.
*/
- private boolean getKeyRange(List<String> keys, final AbstractBounds range, int maxResults)
+ private void getKeyRange(List<String> keys, final AbstractBounds range, int maxResults)
throws IOException, ExecutionException, InterruptedException
{
final DecoratedKey startWith = new DecoratedKey(range.left, null);
@@ -974,21 +974,22 @@ public class ColumnFamilyStore implement
{
if (!stopAt.isEmpty() && stopAt.compareTo(current) < 0)
{
- return true;
+ return;
}
if (range instanceof Bounds || !first || !current.equals(startWith))
{
+ if (logger_.isDebugEnabled())
+ logger_.debug("scanned " + current.key + " with token of " + StorageService.getPartitioner().getToken(current.key));
keys.add(current.key);
}
first = false;
if (keys.size() >= maxResults)
{
- return true;
+ return;
}
}
- return false;
}
finally
{
@@ -1017,23 +1018,10 @@ public class ColumnFamilyStore implement
throws IOException, ExecutionException, InterruptedException
{
List<String> keys = new ArrayList<String>();
- boolean completed;
- if ((range instanceof Bounds || !((Range)range).isWrapAround()))
- {
- completed = getKeyRange(keys, range, keyMax);
- }
- else
- {
- // wrapped range
- Token min = StorageService.getPartitioner().getMinimumToken();
- Range first = new Range(range.left, min);
- completed = getKeyRange(keys, first, keyMax);
- if (!completed && min.compareTo(range.right) < 0)
- {
- Range second = new Range(min, range.right);
- getKeyRange(keys, second, keyMax);
- }
- }
+ assert range instanceof Bounds
+ || (!((Range)range).isWrapAround() || range.right.equals(StorageService.getPartitioner().getMinimumToken()))
+ : range;
+ getKeyRange(keys, range, keyMax);
List<Row> rows = new ArrayList<Row>(keys.size());
final QueryPath queryPath = new QueryPath(columnFamily_, super_column, null);
final SortedSet<byte[]> columnNameSet = new TreeSet<byte[]>(getComparator());
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/dht/Range.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/dht/Range.java?rev=965604&r1=965603&r2=965604&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/dht/Range.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/dht/Range.java Mon Jul 19 19:52:26 2010
@@ -131,10 +131,10 @@ public class Range extends AbstractBound
*/
public Set<Range> intersectionWith(Range that)
{
- if (this.contains(that))
- return rangeSet(that);
if (that.contains(this))
return rangeSet(this);
+ if (this.contains(that))
+ return rangeSet(that);
boolean thiswraps = isWrapAround(left, right);
boolean thatwraps = isWrapAround(that.left, that.right);
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=965604&r1=965603&r2=965604&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java Mon Jul 19 19:52:26 2010
@@ -659,10 +659,13 @@ public class StorageProxy implements Sto
* D, but we don't want any other results from it until after the (D, T] range. Unwrapping so that
* the ranges we consider are (D, T], (T, MIN], (MIN, D] fixes this.
*/
- private static List<AbstractBounds> getRestrictedRanges(AbstractBounds queryRange)
+ private static List<AbstractBounds> getRestrictedRanges(final AbstractBounds queryRange)
{
TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata();
+ if (logger.isDebugEnabled())
+ logger.debug("computing restricted ranges for query " + queryRange);
+
List<AbstractBounds> ranges = new ArrayList<AbstractBounds>();
// for each node, compute its intersection with the query range, and add its unwrapped components to our list
for (Token nodeToken : tokenMetadata.sortedTokens())
@@ -682,14 +685,23 @@ public class StorageProxy implements Sto
// re-sort ranges in ring order, post-unwrapping
Comparator<AbstractBounds> comparator = new Comparator<AbstractBounds>()
{
+ // no restricted ranges will overlap so we don't need to worry about inclusive vs exclusive left,
+ // just sort by raw token position.
public int compare(AbstractBounds o1, AbstractBounds o2)
{
- // no restricted ranges will overlap so we don't need to worry about inclusive vs exclusive left,
- // just sort by raw token position.
- return o1.left.compareTo(o2.left);
+ // sort in order that the original query range would see them.
+ int queryOrder1 = queryRange.left.compareTo(o1.left);
+ int queryOrder2 = queryRange.left.compareTo(o2.left);
+ if (queryOrder1 < queryOrder2)
+ return -1; // o1 comes after query start, o2 wraps to after
+ if (queryOrder1 > queryOrder2)
+ return 1; // o2 comes after query start, o1 wraps to after
+ return o1.left.compareTo(o2.left); // o1 and o2 are on the same side of query start
}
};
Collections.sort(ranges, comparator);
+ if (logger.isDebugEnabled())
+ logger.debug("Sorted ranges are [" + StringUtils.join(ranges, ", ") + "]");
return ranges;
}
Modified: cassandra/branches/cassandra-0.6/test/system/test_server.py
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/system/test_server.py?rev=965604&r1=965603&r2=965604&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/test/system/test_server.py (original)
+++ cassandra/branches/cassandra-0.6/test/system/test_server.py Mon Jul 19 19:52:26 2010
@@ -906,6 +906,26 @@ class TestMutations(CassandraTester):
assert result[1].columns[0].column.name == 'col1'
+ def test_wrapped_range_slices(self):
+ def copp_token(key):
+ # I cheated and generated this from Java
+ return {'a': '00530000000100000001',
+ 'b': '00540000000100000001',
+ 'c': '00550000000100000001',
+ 'd': '00560000000100000001',
+ 'e': '00580000000100000001'}[key]
+ for key in ['a', 'b', 'c', 'd', 'e']:
+ for cname in ['col1', 'col2', 'col3', 'col4', 'col5']:
+ client.insert('Keyspace1', key, ColumnPath('Standard1', column=cname), 'v-' + cname, 0, ConsistencyLevel.ONE)
+ cp = ColumnParent('Standard1')
+
+ result = client.get_range_slices("Keyspace1", cp, SlicePredicate(column_names=['col1', 'col3']), KeyRange(start_token=copp_token('e'), end_token=copp_token('e')), ConsistencyLevel.ONE)
+ assert [row.key for row in result] == ['a', 'b', 'c', 'd', 'e',], [row.key for row in result]
+
+ result = client.get_range_slices("Keyspace1", cp, SlicePredicate(column_names=['col1', 'col3']), KeyRange(start_token=copp_token('c'), end_token=copp_token('c')), ConsistencyLevel.ONE)
+ assert [row.key for row in result] == ['d', 'e', 'a', 'b', 'c',], [row.key for row in result]
+
+
def test_get_slice_by_names(self):
_insert_range()
p = SlicePredicate(column_names=['c1', 'c2'])