You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/07 19:35:26 UTC
svn commit: r802098 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/
test/unit/org/apache/cassandra/db/
Author: jbellis
Date: Fri Aug 7 17:35:25 2009
New Revision: 802098
URL: http://svn.apache.org/viewvc?rev=802098&view=rev
Log:
move getKeyRange to CFS, where it encapsulates better.
patch by jbellis; reviewed by Eric Evans for CASSANDRA-345
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=802098&r1=802097&r2=802098&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Aug 7 17:35:25 2009
@@ -43,7 +43,9 @@
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.collections.Predicate;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -1458,6 +1460,117 @@
}
}
+ /**
+ * @param startWith key to start with, inclusive. empty string = start at beginning.
+ * @param stopAt key to stop at, inclusive. empty string = stop only when keys are exhausted.
+ * @param maxResults
+ * @return list of keys between startWith and stopAt
+ */
+ public RangeReply getKeyRange(final String startWith, final String stopAt, int maxResults)
+ throws IOException, ExecutionException, InterruptedException
+ {
+ getReadLock().lock();
+ try
+ {
+ return getKeyRangeUnsafe(startWith, stopAt, maxResults);
+ }
+ finally
+ {
+ getReadLock().unlock();
+ }
+ }
+
+ private RangeReply getKeyRangeUnsafe(final String startWith, final String stopAt, int maxResults) throws IOException, ExecutionException, InterruptedException
+ {
+ // (OPP key decoration is a no-op so using the "decorated" comparator against raw keys is fine)
+ final Comparator<String> comparator = StorageService.getPartitioner().getDecoratedKeyComparator();
+
+ // create a CollatedIterator that will return unique keys from different sources
+ // (current memtable, historical memtables, and SSTables) in the correct order.
+ List<Iterator<String>> iterators = new ArrayList<Iterator<String>>();
+
+ // we iterate through memtables with a priority queue to avoid more sorting than necessary.
+ // this predicate throws out the keys before the start of our range.
+ Predicate p = new Predicate()
+ {
+ public boolean evaluate(Object key)
+ {
+ String st = (String)key;
+ return comparator.compare(startWith, st) <= 0 && (stopAt.isEmpty() || comparator.compare(st, stopAt) <= 0);
+ }
+ };
+
+ // current memtable keys. have to go through the CFS api for locking.
+ iterators.add(IteratorUtils.filteredIterator(memtableKeyIterator(), p));
+ // historical memtables
+ for (Memtable memtable : ColumnFamilyStore.getUnflushedMemtables(columnFamily_))
+ {
+ iterators.add(IteratorUtils.filteredIterator(Memtable.getKeyIterator(memtable.getKeys()), p));
+ }
+
+ // sstables
+ for (SSTableReader sstable : getSSTables())
+ {
+ FileStruct fs = sstable.getFileStruct();
+ fs.seekTo(startWith);
+ iterators.add(fs);
+ }
+
+ Iterator<String> collated = IteratorUtils.collatedIterator(comparator, iterators);
+ Iterable<String> reduced = new ReducingIterator<String>(collated) {
+ String current;
+
+ public void reduce(String current)
+ {
+ this.current = current;
+ }
+
+ protected String getReduced()
+ {
+ return current;
+ }
+ };
+
+ try
+ {
+ // pull keys out of the CollatedIterator. checking tombstone status is expensive,
+ // so we set an arbitrary limit on how many we'll do at once.
+ List<String> keys = new ArrayList<String>();
+ boolean rangeCompletedLocally = false;
+ for (String current : reduced)
+ {
+ if (!stopAt.isEmpty() && comparator.compare(stopAt, current) < 0)
+ {
+ rangeCompletedLocally = true;
+ break;
+ }
+ // make sure there is actually non-tombstone content associated w/ this key
+ // TODO record the key source(s) somehow and only check that source (e.g., memtable or sstable)
+ QueryFilter filter = new SliceQueryFilter(current, new QueryPath(columnFamily_), ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY, true, 1);
+ if (getColumnFamily(filter, Integer.MAX_VALUE) != null)
+ {
+ keys.add(current);
+ }
+ if (keys.size() >= maxResults)
+ {
+ rangeCompletedLocally = true;
+ break;
+ }
+ }
+ return new RangeReply(keys, rangeCompletedLocally);
+ }
+ finally
+ {
+ for (Iterator iter : iterators)
+ {
+ if (iter instanceof FileStruct)
+ {
+ ((FileStruct)iter).close();
+ }
+ }
+ }
+ }
+
public AbstractType getComparator()
{
return DatabaseDescriptor.getComparator(table_, columnFamily_);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=802098&r1=802097&r2=802098&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Aug 7 17:35:25 2009
@@ -664,120 +664,6 @@
return applicationColumnFamilies_;
}
- /**
- * @param startWith key to start with, inclusive. empty string = start at beginning.
- * @param stopAt key to stop at, inclusive. empty string = stop only when keys are exhausted.
- * @param maxResults
- * @return list of keys between startWith and stopAt
- */
- public RangeReply getKeyRange(String columnFamily, final String startWith, final String stopAt, int maxResults)
- throws IOException, ExecutionException, InterruptedException
- {
- assert getColumnFamilyStore(columnFamily) != null : columnFamily;
-
- getColumnFamilyStore(columnFamily).getReadLock().lock();
- try
- {
- return getKeyRangeUnsafe(columnFamily, startWith, stopAt, maxResults);
- }
- finally
- {
- getColumnFamilyStore(columnFamily).getReadLock().unlock();
- }
- }
-
- private RangeReply getKeyRangeUnsafe(final String cfName, final String startWith, final String stopAt, int maxResults) throws IOException, ExecutionException, InterruptedException
- {
- // (OPP key decoration is a no-op so using the "decorated" comparator against raw keys is fine)
- final Comparator<String> comparator = StorageService.getPartitioner().getDecoratedKeyComparator();
-
- // create a CollatedIterator that will return unique keys from different sources
- // (current memtable, historical memtables, and SSTables) in the correct order.
- List<Iterator<String>> iterators = new ArrayList<Iterator<String>>();
- ColumnFamilyStore cfs = getColumnFamilyStore(cfName);
-
- // we iterate through memtables with a priority queue to avoid more sorting than necessary.
- // this predicate throws out the keys before the start of our range.
- Predicate p = new Predicate()
- {
- public boolean evaluate(Object key)
- {
- String st = (String)key;
- return comparator.compare(startWith, st) <= 0 && (stopAt.isEmpty() || comparator.compare(st, stopAt) <= 0);
- }
- };
-
- // current memtable keys. have to go through the CFS api for locking.
- iterators.add(IteratorUtils.filteredIterator(cfs.memtableKeyIterator(), p));
- // historical memtables
- for (Memtable memtable : ColumnFamilyStore.getUnflushedMemtables(cfName))
- {
- iterators.add(IteratorUtils.filteredIterator(Memtable.getKeyIterator(memtable.getKeys()), p));
- }
-
- // sstables
- for (SSTableReader sstable : cfs.getSSTables())
- {
- FileStruct fs = sstable.getFileStruct();
- fs.seekTo(startWith);
- iterators.add(fs);
- }
-
- Iterator<String> collated = IteratorUtils.collatedIterator(comparator, iterators);
- Iterable<String> reduced = new ReducingIterator<String>(collated) {
- String current;
-
- public void reduce(String current)
- {
- this.current = current;
- }
-
- protected String getReduced()
- {
- return current;
- }
- };
-
- try
- {
- // pull keys out of the CollatedIterator. checking tombstone status is expensive,
- // so we set an arbitrary limit on how many we'll do at once.
- List<String> keys = new ArrayList<String>();
- boolean rangeCompletedLocally = false;
- for (String current : reduced)
- {
- if (!stopAt.isEmpty() && comparator.compare(stopAt, current) < 0)
- {
- rangeCompletedLocally = true;
- break;
- }
- // make sure there is actually non-tombstone content associated w/ this key
- // TODO record the key source(s) somehow and only check that source (e.g., memtable or sstable)
- QueryFilter filter = new SliceQueryFilter(current, new QueryPath(cfName), ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY, true, 1);
- if (cfs.getColumnFamily(filter, Integer.MAX_VALUE) != null)
- {
- keys.add(current);
- }
- if (keys.size() >= maxResults)
- {
- rangeCompletedLocally = true;
- break;
- }
- }
- return new RangeReply(keys, rangeCompletedLocally);
- }
- finally
- {
- for (Iterator iter : iterators)
- {
- if (iter instanceof FileStruct)
- {
- ((FileStruct)iter).close();
- }
- }
- }
- }
-
public static String getSnapshotPath(String dataDirPath, String tableName, String snapshotName)
{
return dataDirPath + File.separator + tableName + File.separator + SNAPSHOT_SUBDIR_NAME + File.separator + snapshotName;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java?rev=802098&r1=802097&r2=802098&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java Fri Aug 7 17:35:25 2009
@@ -38,7 +38,7 @@
RangeCommand command = RangeCommand.read(message);
Table table = Table.open(command.table);
- RangeReply rangeReply = table.getKeyRange(command.columnFamily, command.startWith, command.stopAt, command.maxResults);
+ RangeReply rangeReply = table.getColumnFamilyStore(command.columnFamily).getKeyRange(command.startWith, command.stopAt, command.maxResults);
Message response = rangeReply.getReply(message);
if (logger.isDebugEnabled())
logger.debug("Sending " + rangeReply + " to " + message.getMessageId() + "@" + message.getFrom());
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=802098&r1=802097&r2=802098&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java Fri Aug 7 17:35:25 2009
@@ -52,7 +52,7 @@
inserted.add(key);
}
store.forceBlockingFlush();
- assertEquals(table.getKeyRange("Standard1", "", "", 10000).keys.size(), inserted.size());
+ assertEquals(table.getColumnFamilyStore("Standard1").getKeyRange("", "", 10000).keys.size(), inserted.size());
}
while (true)
{
@@ -64,6 +64,6 @@
{
store.doCompaction(store.getSSTables().size());
}
- assertEquals(table.getKeyRange("Standard1", "", "", 10000).keys.size(), inserted.size());
+ assertEquals(table.getColumnFamilyStore("Standard1").getKeyRange("", "", 10000).keys.size(), inserted.size());
}
}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java?rev=802098&r1=802097&r2=802098&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java Fri Aug 7 17:35:25 2009
@@ -45,12 +45,12 @@
rm.apply();
inserted.add(key);
store.forceBlockingFlush();
- assertEquals(inserted.size(), table.getKeyRange(columnFamilyName, "", "", 10000).keys.size());
+ assertEquals(inserted.size(), table.getColumnFamilyStore(columnFamilyName).getKeyRange("", "", 10000).keys.size());
}
Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2);
ft.get();
assertEquals(1, store.getSSTables().size());
- assertEquals(table.getKeyRange(columnFamilyName, "", "", 10000).keys.size(), inserted.size());
+ assertEquals(table.getColumnFamilyStore(columnFamilyName).getKeyRange("", "", 10000).keys.size(), inserted.size());
}
@Test
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java?rev=802098&r1=802097&r2=802098&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java Fri Aug 7 17:35:25 2009
@@ -34,7 +34,7 @@
table1.getColumnFamilyStore("Standard1").clearUnsafe();
RecoveryManager.doRecovery();
- Set<String> foundKeys = new HashSet<String>(table1.getKeyRange("Standard1", "", "", 1000).keys);
+ Set<String> foundKeys = new HashSet<String>(table1.getColumnFamilyStore("Standard1").getKeyRange("", "", 1000).keys);
assert keys.equals(foundKeys);
}
}