You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/07 19:35:26 UTC

svn commit: r802098 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/db/

Author: jbellis
Date: Fri Aug  7 17:35:25 2009
New Revision: 802098

URL: http://svn.apache.org/viewvc?rev=802098&view=rev
Log:
move getKeyRange to CFS, where it encapsulates better.
patch by jbellis; reviewed by Eric Evans for CASSANDRA-345

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=802098&r1=802097&r2=802098&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Aug  7 17:35:25 2009
@@ -43,7 +43,9 @@
 import org.apache.cassandra.db.marshal.AbstractType;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.collections.Predicate;
 
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
@@ -1458,6 +1460,117 @@
         }
     }
 
+    /**
+     * @param startWith key to start with, inclusive.  empty string = start at beginning.
+     * @param stopAt key to stop at, inclusive.  empty string = stop only when keys are exhausted.
+     * @param maxResults
+     * @return list of keys between startWith and stopAt
+     */
+    public RangeReply getKeyRange(final String startWith, final String stopAt, int maxResults)
+    throws IOException, ExecutionException, InterruptedException
+    {
+        getReadLock().lock();
+        try
+        {
+            return getKeyRangeUnsafe(startWith, stopAt, maxResults);
+        }
+        finally
+        {
+            getReadLock().unlock();
+        }
+    }
+
+    private RangeReply getKeyRangeUnsafe(final String startWith, final String stopAt, int maxResults) throws IOException, ExecutionException, InterruptedException
+    {
+        // (OPP key decoration is a no-op so using the "decorated" comparator against raw keys is fine)
+        final Comparator<String> comparator = StorageService.getPartitioner().getDecoratedKeyComparator();
+
+        // create a CollatedIterator that will return unique keys from different sources
+        // (current memtable, historical memtables, and SSTables) in the correct order.
+        List<Iterator<String>> iterators = new ArrayList<Iterator<String>>();
+
+        // we iterate through memtables with a priority queue to avoid more sorting than necessary.
+        // this predicate throws out the keys before the start of our range.
+        Predicate p = new Predicate()
+        {
+            public boolean evaluate(Object key)
+            {
+                String st = (String)key;
+                return comparator.compare(startWith, st) <= 0 && (stopAt.isEmpty() || comparator.compare(st, stopAt) <= 0);
+            }
+        };
+
+        // current memtable keys.  have to go through the CFS api for locking.
+        iterators.add(IteratorUtils.filteredIterator(memtableKeyIterator(), p));
+        // historical memtables
+        for (Memtable memtable : ColumnFamilyStore.getUnflushedMemtables(columnFamily_))
+        {
+            iterators.add(IteratorUtils.filteredIterator(Memtable.getKeyIterator(memtable.getKeys()), p));
+        }
+
+        // sstables
+        for (SSTableReader sstable : getSSTables())
+        {
+            FileStruct fs = sstable.getFileStruct();
+            fs.seekTo(startWith);
+            iterators.add(fs);
+        }
+
+        Iterator<String> collated = IteratorUtils.collatedIterator(comparator, iterators);
+        Iterable<String> reduced = new ReducingIterator<String>(collated) {
+            String current;
+
+            public void reduce(String current)
+            {
+                 this.current = current;
+            }
+
+            protected String getReduced()
+            {
+                return current;
+            }
+        };
+
+        try
+        {
+            // pull keys out of the CollatedIterator.  checking tombstone status is expensive,
+            // so we set an arbitrary limit on how many we'll do at once.
+            List<String> keys = new ArrayList<String>();
+            boolean rangeCompletedLocally = false;
+            for (String current : reduced)
+            {
+                if (!stopAt.isEmpty() && comparator.compare(stopAt, current) < 0)
+                {
+                    rangeCompletedLocally = true;
+                    break;
+                }
+                // make sure there is actually non-tombstone content associated w/ this key
+                // TODO record the key source(s) somehow and only check that source (e.g., memtable or sstable)
+                QueryFilter filter = new SliceQueryFilter(current, new QueryPath(columnFamily_), ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY, true, 1);
+                if (getColumnFamily(filter, Integer.MAX_VALUE) != null)
+                {
+                    keys.add(current);
+                }
+                if (keys.size() >= maxResults)
+                {
+                    rangeCompletedLocally = true;
+                    break;
+                }
+            }
+            return new RangeReply(keys, rangeCompletedLocally);
+        }
+        finally
+        {
+            for (Iterator iter : iterators)
+            {
+                if (iter instanceof FileStruct)
+                {
+                    ((FileStruct)iter).close();
+                }
+            }
+        }
+    }
+
     public AbstractType getComparator()
     {
         return DatabaseDescriptor.getComparator(table_, columnFamily_);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=802098&r1=802097&r2=802098&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Aug  7 17:35:25 2009
@@ -664,120 +664,6 @@
         return applicationColumnFamilies_;
     }
 
-    /**
-     * @param startWith key to start with, inclusive.  empty string = start at beginning.
-     * @param stopAt key to stop at, inclusive.  empty string = stop only when keys are exhausted.
-     * @param maxResults
-     * @return list of keys between startWith and stopAt
-     */
-    public RangeReply getKeyRange(String columnFamily, final String startWith, final String stopAt, int maxResults)
-    throws IOException, ExecutionException, InterruptedException
-    {
-        assert getColumnFamilyStore(columnFamily) != null : columnFamily;
-
-        getColumnFamilyStore(columnFamily).getReadLock().lock();
-        try
-        {
-            return getKeyRangeUnsafe(columnFamily, startWith, stopAt, maxResults);
-        }
-        finally
-        {
-            getColumnFamilyStore(columnFamily).getReadLock().unlock();
-        }
-    }
-
-    private RangeReply getKeyRangeUnsafe(final String cfName, final String startWith, final String stopAt, int maxResults) throws IOException, ExecutionException, InterruptedException
-    {
-        // (OPP key decoration is a no-op so using the "decorated" comparator against raw keys is fine)
-        final Comparator<String> comparator = StorageService.getPartitioner().getDecoratedKeyComparator();
-
-        // create a CollatedIterator that will return unique keys from different sources
-        // (current memtable, historical memtables, and SSTables) in the correct order.
-        List<Iterator<String>> iterators = new ArrayList<Iterator<String>>();
-        ColumnFamilyStore cfs = getColumnFamilyStore(cfName);
-
-        // we iterate through memtables with a priority queue to avoid more sorting than necessary.
-        // this predicate throws out the keys before the start of our range.
-        Predicate p = new Predicate()
-        {
-            public boolean evaluate(Object key)
-            {
-                String st = (String)key;
-                return comparator.compare(startWith, st) <= 0 && (stopAt.isEmpty() || comparator.compare(st, stopAt) <= 0);
-            }
-        };
-
-        // current memtable keys.  have to go through the CFS api for locking.
-        iterators.add(IteratorUtils.filteredIterator(cfs.memtableKeyIterator(), p));
-        // historical memtables
-        for (Memtable memtable : ColumnFamilyStore.getUnflushedMemtables(cfName))
-        {
-            iterators.add(IteratorUtils.filteredIterator(Memtable.getKeyIterator(memtable.getKeys()), p));
-        }
-
-        // sstables
-        for (SSTableReader sstable : cfs.getSSTables())
-        {
-            FileStruct fs = sstable.getFileStruct();
-            fs.seekTo(startWith);
-            iterators.add(fs);
-        }
-
-        Iterator<String> collated = IteratorUtils.collatedIterator(comparator, iterators);
-        Iterable<String> reduced = new ReducingIterator<String>(collated) {
-            String current;
-
-            public void reduce(String current)
-            {
-                 this.current = current;
-            }
-
-            protected String getReduced()
-            {
-                return current;
-            }
-        };
-
-        try
-        {
-            // pull keys out of the CollatedIterator.  checking tombstone status is expensive,
-            // so we set an arbitrary limit on how many we'll do at once.
-            List<String> keys = new ArrayList<String>();
-            boolean rangeCompletedLocally = false;
-            for (String current : reduced)
-            {
-                if (!stopAt.isEmpty() && comparator.compare(stopAt, current) < 0)
-                {
-                    rangeCompletedLocally = true;
-                    break;
-                }
-                // make sure there is actually non-tombstone content associated w/ this key
-                // TODO record the key source(s) somehow and only check that source (e.g., memtable or sstable)
-                QueryFilter filter = new SliceQueryFilter(current, new QueryPath(cfName), ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY, true, 1);
-                if (cfs.getColumnFamily(filter, Integer.MAX_VALUE) != null)
-                {
-                    keys.add(current);
-                }
-                if (keys.size() >= maxResults)
-                {
-                    rangeCompletedLocally = true;
-                    break;
-                }
-            }
-            return new RangeReply(keys, rangeCompletedLocally);
-        }
-        finally
-        {
-            for (Iterator iter : iterators)
-            {
-                if (iter instanceof FileStruct)
-                {
-                    ((FileStruct)iter).close();
-                }
-            }
-        }
-    }
-
     public static String getSnapshotPath(String dataDirPath, String tableName, String snapshotName)
     {
         return dataDirPath + File.separator + tableName + File.separator + SNAPSHOT_SUBDIR_NAME + File.separator + snapshotName;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java?rev=802098&r1=802097&r2=802098&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java Fri Aug  7 17:35:25 2009
@@ -38,7 +38,7 @@
             RangeCommand command = RangeCommand.read(message);
             Table table = Table.open(command.table);
 
-            RangeReply rangeReply = table.getKeyRange(command.columnFamily, command.startWith, command.stopAt, command.maxResults);
+            RangeReply rangeReply = table.getColumnFamilyStore(command.columnFamily).getKeyRange(command.startWith, command.stopAt, command.maxResults);
             Message response = rangeReply.getReply(message);
             if (logger.isDebugEnabled())
                 logger.debug("Sending " + rangeReply + " to " + message.getMessageId() + "@" + message.getFrom());

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=802098&r1=802097&r2=802098&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java Fri Aug  7 17:35:25 2009
@@ -52,7 +52,7 @@
                 inserted.add(key);
             }
             store.forceBlockingFlush();
-            assertEquals(table.getKeyRange("Standard1", "", "", 10000).keys.size(), inserted.size());
+            assertEquals(table.getColumnFamilyStore("Standard1").getKeyRange("", "", 10000).keys.size(), inserted.size());
         }
         while (true)
         {
@@ -64,6 +64,6 @@
         {
             store.doCompaction(store.getSSTables().size());
         }
-        assertEquals(table.getKeyRange("Standard1", "", "", 10000).keys.size(), inserted.size());
+        assertEquals(table.getColumnFamilyStore("Standard1").getKeyRange("", "", 10000).keys.size(), inserted.size());
     }
 }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java?rev=802098&r1=802097&r2=802098&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java Fri Aug  7 17:35:25 2009
@@ -45,12 +45,12 @@
             rm.apply();
             inserted.add(key);
             store.forceBlockingFlush();
-            assertEquals(inserted.size(), table.getKeyRange(columnFamilyName, "", "", 10000).keys.size());
+            assertEquals(inserted.size(), table.getColumnFamilyStore(columnFamilyName).getKeyRange("", "", 10000).keys.size());
         }
         Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2);
         ft.get();
         assertEquals(1, store.getSSTables().size());
-        assertEquals(table.getKeyRange(columnFamilyName, "", "", 10000).keys.size(), inserted.size());
+        assertEquals(table.getColumnFamilyStore(columnFamilyName).getKeyRange("", "", 10000).keys.size(), inserted.size());
     }
 
     @Test

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java?rev=802098&r1=802097&r2=802098&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java Fri Aug  7 17:35:25 2009
@@ -34,7 +34,7 @@
         table1.getColumnFamilyStore("Standard1").clearUnsafe();
         RecoveryManager.doRecovery();
 
-        Set<String> foundKeys = new HashSet<String>(table1.getKeyRange("Standard1", "", "", 1000).keys);
+        Set<String> foundKeys = new HashSet<String>(table1.getColumnFamilyStore("Standard1").getKeyRange("", "", 1000).keys);
         assert keys.equals(foundKeys);
     }
 }