You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/03/27 03:44:58 UTC

svn commit: r759001 - in /incubator/cassandra/trunk: src/org/apache/cassandra/config/ src/org/apache/cassandra/db/ src/org/apache/cassandra/service/ src/org/apache/cassandra/utils/ test/org/apache/cassandra/service/

Author: jbellis
Date: Fri Mar 27 02:44:57 2009
New Revision: 759001

URL: http://svn.apache.org/viewvc?rev=759001&view=rev
Log:
range query support

Added:
    incubator/cassandra/trunk/src/org/apache/cassandra/service/RangeVerbHandler.java
    incubator/cassandra/trunk/src/org/apache/cassandra/utils/DestructivePQIterator.java
Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/test/org/apache/cassandra/service/CassandraServerTest.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java?rev=759001&r1=759000&r2=759001&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java Fri Mar 27 02:44:57 2009
@@ -18,21 +18,24 @@
 
 package org.apache.cassandra.config;
 
-import java.util.*;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.io.*;
 
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.SystemTable;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.TypeInfo;
-import org.apache.cassandra.db.DBManager;
-import org.apache.cassandra.db.SystemTable;
-import org.apache.cassandra.db.Table.TableMetadata;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.XMLUtils;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
-import org.apache.cassandra.io.*;
 
 
 /**
@@ -306,6 +309,9 @@
             /* Read the table related stuff from config */
             NodeList tables = xmlUtils.getRequestedNodeList("/Storage/Tables/Table");
             int size = tables.getLength();
+            if (size == 0) {
+                throw new UnsupportedOperationException("A Table must be configured");
+            }
             for ( int i = 0; i < size; ++i )
             {
                 Node table = tables.item(i);
@@ -785,4 +791,9 @@
     {
         return tableToCFMetaDataMap_;
     }
+
+    public static String getTableName()
+    {
+        return tables_.get(0);
+    }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java?rev=759001&r1=759000&r2=759001&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java Fri Mar 27 02:44:57 2009
@@ -288,7 +288,7 @@
         return columns_.getSortedColumns();
     }
 
-    Map<String, IColumn> getColumns()
+    public Map<String, IColumn> getColumns()
     {
         return columns_.getColumns();
     }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=759001&r1=759000&r2=759001&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java Fri Mar 27 02:44:57 2009
@@ -1384,4 +1384,14 @@
     {
         memtable_.get().flushOnRecovery();
     }
+
+    public Object getMemtable()
+    {
+        return memtable_.get();
+    }
+
+    public Set<String> getSSTableFilenames()
+    {
+        return Collections.unmodifiableSet(ssTables_);
+    }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java?rev=759001&r1=759000&r2=759001&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java Fri Mar 27 02:44:57 2009
@@ -26,6 +26,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Iterator;
+import java.util.PriorityQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -45,6 +47,7 @@
 import org.apache.cassandra.io.SSTable;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.DestructivePQIterator;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -419,4 +422,9 @@
         columnFamilies_.clear();
     }
 
+    public Iterator<String> sortedKeyIterator()
+    {
+        return new DestructivePQIterator<String>(new PriorityQueue<String>(columnFamilies_.keySet()));
+    }
+
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java?rev=759001&r1=759000&r2=759001&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java Fri Mar 27 02:44:57 2009
@@ -40,7 +40,7 @@
     private static Lock lock_ = new ReentrantLock();
     private static Logger logger_ = Logger.getLogger(MemtableManager.class);
     private ReentrantReadWriteLock rwLock_ = new ReentrantReadWriteLock(true);
-    static MemtableManager instance() 
+    public static MemtableManager instance()
     {
         if ( instance_ == null )
         {
@@ -157,7 +157,22 @@
     	}
     }
 
-
-
+    public List<Memtable> getUnflushedMemtables(String cfName)
+    {
+        rwLock_.readLock().lock();
+        try
+        {
+            List<Memtable> memtables = history_.get(cfName);
+            if (memtables != null)
+            {
+                return new ArrayList<Memtable>(memtables);
+            }
+            return Arrays.asList(new Memtable[0]);
+        }
+        finally
+        {
+            rwLock_.readLock().unlock();
+        }
+    }
 
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java?rev=759001&r1=759000&r2=759001&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java Fri Mar 27 02:44:57 2009
@@ -60,7 +60,7 @@
      * is basically the column family name and the ID associated with
      * this column family. We use this ID in the Commit Log header to
      * determine when a log file that has been rolled can be deleted.
-    */    
+    */
     public static class TableMetadata
     {
         /* Name of the column family */
@@ -454,7 +454,7 @@
         return columnFamilyStores_;
     }
 
-    ColumnFamilyStore getColumnFamilyStore(String cfName)
+    public ColumnFamilyStore getColumnFamilyStore(String cfName)
     {
         return columnFamilyStores_.get(cfName);
     }
@@ -875,4 +875,17 @@
         long timeTaken = System.currentTimeMillis() - start;
         dbAnalyticsSource_.updateWriteStatistics(timeTaken);
     }
+
+    public Set<String> getApplicationColumnFamilies()
+    {
+        Set<String> set = new HashSet<String>();
+        for (String cfName : getColumnFamilies())
+        {
+            if (DatabaseDescriptor.isApplicationColumnFamily(cfName))
+            {
+                set.add(cfName);
+            }
+        }
+        return set;
+    }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java?rev=759001&r1=759000&r2=759001&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java Fri Mar 27 02:44:57 2009
@@ -817,6 +817,71 @@
         return result;
     }
 
+    public List<String> get_range(String tablename, final String startkey) throws CassandraException
+    {
+        logger_.debug("get_range");
+
+        // send request
+        Message message;
+        DataOutputBuffer dob = new DataOutputBuffer();
+        try
+        {
+            dob.writeUTF(startkey);
+        }
+        catch (IOException e)
+        {
+            logger_.error("unable to write startkey", e);
+            throw new RuntimeException(e);
+        }
+        byte[] messageBody = Arrays.copyOf(dob.getData(), dob.getLength());
+        message = new Message(StorageService.getLocalStorageEndPoint(),
+                              StorageService.readStage_,
+                              StorageService.rangeVerbHandler_,
+                              messageBody);
+        EndPoint endPoint;
+        try
+        {
+            endPoint = StorageService.instance().findSuitableEndPoint(startkey);
+        }
+        catch (Exception e)
+        {
+            throw new CassandraException("Unable to find endpoint for " + startkey);
+        }
+        IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
+
+        // read response
+        // TODO send more requests if we need to span multiple nodes (or can we just let client worry about that,
+        // since they have to handle multiple requests anyway?)
+        byte[] responseBody;
+        try
+        {
+            responseBody = (byte[]) iar.get(2 * DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS)[0];
+        }
+        catch (TimeoutException e)
+        {
+            throw new RuntimeException(e);
+        }
+        DataInputBuffer bufIn = new DataInputBuffer();
+        bufIn.reset(responseBody, responseBody.length);
+
+        // turn into List
+        List<String> keys = new ArrayList<String>();
+        while (bufIn.getPosition() < responseBody.length)
+        {
+            try
+            {
+                keys.add(bufIn.readUTF());
+            }
+            catch (IOException e)
+            {
+                logger_.error("bad utf", e);
+                throw new RuntimeException(e);
+            }
+        }
+
+        return keys;
+    }
+
     /*
      * This method is used to ensure that all keys
      * prior to the specified key, as dtermined by

Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/RangeVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/RangeVerbHandler.java?rev=759001&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/RangeVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/RangeVerbHandler.java Fri Mar 27 02:44:57 2009
@@ -0,0 +1,152 @@
+package org.apache.cassandra.service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
+import java.util.Comparator;
+import java.util.Arrays;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.collections.Predicate;
+
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.db.IdentityFilter;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.FileStruct;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.MemtableManager;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+public class RangeVerbHandler implements IVerbHandler
+{
+    public static final Comparator<String> STRING_COMPARATOR = new Comparator<String>()
+    {
+        public int compare(String o1, String o2)
+        {
+            return o1.compareTo(o2);
+        }
+    };
+
+    public void doVerb(Message message)
+    {
+        byte[] bytes = (byte[]) message.getMessageBody()[0];
+        final String startkey;
+        if (bytes.length == 0)
+        {
+            startkey = "";
+        }
+        else
+        {
+            DataInputBuffer dib = new DataInputBuffer();
+            dib.reset(bytes, bytes.length);
+            try
+            {
+                startkey = dib.readUTF();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        List<Iterator<String>> iterators = new ArrayList<Iterator<String>>();
+        Table table = Table.open(DatabaseDescriptor.getTableName());
+        for (String cfName : table.getApplicationColumnFamilies())
+        {
+            ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+            // memtable keys: current and historical
+            Iterator<Memtable> it = (Iterator<Memtable>) IteratorUtils.chainedIterator(
+                    IteratorUtils.singletonIterator(cfs.getMemtable()),
+                    MemtableManager.instance().getUnflushedMemtables(cfName).iterator());
+            while (it.hasNext())
+            {
+                iterators.add(IteratorUtils.filteredIterator(it.next().sortedKeyIterator(), new Predicate()
+                {
+                    public boolean evaluate(Object key)
+                    {
+                        return ((String) key).compareTo(startkey) >= 0;
+                    }
+                }));
+            }
+
+            // sstables
+            for (String filename : cfs.getSSTableFilenames())
+            {
+                try
+                {
+                    FileStruct fs = new FileStruct(SequenceFile.reader(filename));
+                    fs.seekTo(startkey);
+                    iterators.add(fs.iterator());
+                }
+                catch (FileNotFoundException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        Iterator<String> iter = IteratorUtils.collatedIterator(STRING_COMPARATOR, iterators);
+        List<String> keys = new ArrayList<String>();
+        String last = null, current = null;
+
+        while (keys.size() < 1000)
+        {
+            if (!iter.hasNext())
+            {
+                break;
+            }
+            current = iter.next();
+            if (!current.equals(last))
+            {
+                last = current;
+                for (String cfName : table.getApplicationColumnFamilies())
+                {
+                    ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+                    try
+                    {
+                        ColumnFamily cf = cfs.getColumnFamily(current, cfName, new IdentityFilter());
+                        if (cf != null && cf.getColumns().size() > 0)
+                        {
+                            keys.add(current);
+                            break;
+                        }
+                    }
+                    catch (IOException e)
+                    {
+                        throw new RuntimeException();
+                    }
+                }
+            }
+        }
+
+        DataOutputBuffer dob = new DataOutputBuffer();
+        for (String key : keys)
+        {
+            try
+            {
+                dob.writeUTF(key);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+        byte[] data = Arrays.copyOf(dob.getData(), dob.getLength());
+        Message response = message.getReply(StorageService.getLocalStorageEndPoint(), data);
+        MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
+    }
+}

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java?rev=759001&r1=759000&r2=759001&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java Fri Mar 27 02:44:57 2009
@@ -138,6 +138,7 @@
     public final static String bsMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
     public final static String calloutDeployVerbHandler_ = "CALLOUT-DEPLOY-VERB-HANDLER";
     public final static String touchVerbHandler_ = "TOUCH-VERB-HANDLER";
+    public static String rangeVerbHandler_ = "RANGE-VERB-HANDLER";
 
     public static enum ConsistencyLevel
     {

Added: incubator/cassandra/trunk/src/org/apache/cassandra/utils/DestructivePQIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/utils/DestructivePQIterator.java?rev=759001&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/utils/DestructivePQIterator.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/utils/DestructivePQIterator.java Fri Mar 27 02:44:57 2009
@@ -0,0 +1,25 @@
+package org.apache.cassandra.utils;
+
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+public class DestructivePQIterator<T> implements Iterator<T> {
+    private PriorityQueue<T> pq;
+
+    public DestructivePQIterator(PriorityQueue<T> pq) {
+        this.pq = pq;
+    }
+
+    public boolean hasNext() {
+        return pq.size() > 0;
+    }
+
+    public T next() {
+        return pq.poll();
+    }
+
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+}
+

Modified: incubator/cassandra/trunk/test/org/apache/cassandra/service/CassandraServerTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/service/CassandraServerTest.java?rev=759001&r1=759000&r2=759001&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/org/apache/cassandra/service/CassandraServerTest.java (original)
+++ incubator/cassandra/trunk/test/org/apache/cassandra/service/CassandraServerTest.java Fri Mar 27 02:44:57 2009
@@ -9,8 +9,50 @@
 import java.io.IOException;
 import java.util.*;
 
+import com.facebook.thrift.TException;
+
 public class CassandraServerTest extends ServerTest {
     /*
+    TODO fix resetting server so this works
+    @Test
+    public void test_get_range_empty() throws IOException, TException {
+        CassandraServer server = new CassandraServer();
+        server.start();
+
+        assert CollectionUtils.EMPTY_COLLECTION.equals(server.get_range(DatabaseDescriptor.getTableName(), ""));
+    }
+    */
+
+    /*
+    @Test
+    public void test_get_range() throws IOException, TException, CassandraException
+    {
+        CassandraServer server = new CassandraServer();
+        try
+        {
+            server.start();
+        }
+        catch (Throwable throwable)
+        {
+            throw new RuntimeException(throwable);
+        }
+
+        // TODO insert some data
+        try {
+            String last = null;
+            for (String key : server.get_range(DatabaseDescriptor.getTableName(), "key1")) {
+                if (last != null) {
+                    assert last.compareTo(key) < 0;
+                }
+                last = key;
+            }
+        } finally {
+            server.shutdown();
+        }
+    }
+    */
+
+    /*
     @Test
     public void test_get_column() throws Throwable {
         CassandraServer server = new CassandraServer();