You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/03/27 03:44:58 UTC
svn commit: r759001 - in /incubator/cassandra/trunk:
src/org/apache/cassandra/config/ src/org/apache/cassandra/db/
src/org/apache/cassandra/service/ src/org/apache/cassandra/utils/
test/org/apache/cassandra/service/
Author: jbellis
Date: Fri Mar 27 02:44:57 2009
New Revision: 759001
URL: http://svn.apache.org/viewvc?rev=759001&view=rev
Log:
range query support
Added:
incubator/cassandra/trunk/src/org/apache/cassandra/service/RangeVerbHandler.java
incubator/cassandra/trunk/src/org/apache/cassandra/utils/DestructivePQIterator.java
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/test/org/apache/cassandra/service/CassandraServerTest.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java?rev=759001&r1=759000&r2=759001&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java Fri Mar 27 02:44:57 2009
@@ -18,21 +18,24 @@
package org.apache.cassandra.config;
-import java.util.*;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-import java.io.*;
import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.SystemTable;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.TypeInfo;
-import org.apache.cassandra.db.DBManager;
-import org.apache.cassandra.db.SystemTable;
-import org.apache.cassandra.db.Table.TableMetadata;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.XMLUtils;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
-import org.apache.cassandra.io.*;
/**
@@ -306,6 +309,9 @@
/* Read the table related stuff from config */
NodeList tables = xmlUtils.getRequestedNodeList("/Storage/Tables/Table");
int size = tables.getLength();
+ if (size == 0) {
+ throw new UnsupportedOperationException("A Table must be configured");
+ }
for ( int i = 0; i < size; ++i )
{
Node table = tables.item(i);
@@ -785,4 +791,9 @@
{
return tableToCFMetaDataMap_;
}
+
+ public static String getTableName()
+ {
+ return tables_.get(0);
+ }
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java?rev=759001&r1=759000&r2=759001&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java Fri Mar 27 02:44:57 2009
@@ -288,7 +288,7 @@
return columns_.getSortedColumns();
}
- Map<String, IColumn> getColumns()
+ public Map<String, IColumn> getColumns()
{
return columns_.getColumns();
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=759001&r1=759000&r2=759001&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java Fri Mar 27 02:44:57 2009
@@ -1384,4 +1384,14 @@
{
memtable_.get().flushOnRecovery();
}
+
+ public Object getMemtable()
+ {
+ return memtable_.get();
+ }
+
+ public Set<String> getSSTableFilenames()
+ {
+ return Collections.unmodifiableSet(ssTables_);
+ }
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java?rev=759001&r1=759000&r2=759001&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java Fri Mar 27 02:44:57 2009
@@ -26,6 +26,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Iterator;
+import java.util.PriorityQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -45,6 +47,7 @@
import org.apache.cassandra.io.SSTable;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.DestructivePQIterator;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -419,4 +422,9 @@
columnFamilies_.clear();
}
+ public Iterator<String> sortedKeyIterator()
+ {
+ return new DestructivePQIterator<String>(new PriorityQueue<String>(columnFamilies_.keySet()));
+ }
+
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java?rev=759001&r1=759000&r2=759001&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java Fri Mar 27 02:44:57 2009
@@ -40,7 +40,7 @@
private static Lock lock_ = new ReentrantLock();
private static Logger logger_ = Logger.getLogger(MemtableManager.class);
private ReentrantReadWriteLock rwLock_ = new ReentrantReadWriteLock(true);
- static MemtableManager instance()
+ public static MemtableManager instance()
{
if ( instance_ == null )
{
@@ -157,7 +157,22 @@
}
}
-
-
+ public List<Memtable> getUnflushedMemtables(String cfName)
+ {
+ rwLock_.readLock().lock();
+ try
+ {
+ List<Memtable> memtables = history_.get(cfName);
+ if (memtables != null)
+ {
+ return new ArrayList<Memtable>(memtables);
+ }
+ return Arrays.asList(new Memtable[0]);
+ }
+ finally
+ {
+ rwLock_.readLock().unlock();
+ }
+ }
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java?rev=759001&r1=759000&r2=759001&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java Fri Mar 27 02:44:57 2009
@@ -60,7 +60,7 @@
* is basically the column family name and the ID associated with
* this column family. We use this ID in the Commit Log header to
* determine when a log file that has been rolled can be deleted.
- */
+ */
public static class TableMetadata
{
/* Name of the column family */
@@ -454,7 +454,7 @@
return columnFamilyStores_;
}
- ColumnFamilyStore getColumnFamilyStore(String cfName)
+ public ColumnFamilyStore getColumnFamilyStore(String cfName)
{
return columnFamilyStores_.get(cfName);
}
@@ -875,4 +875,17 @@
long timeTaken = System.currentTimeMillis() - start;
dbAnalyticsSource_.updateWriteStatistics(timeTaken);
}
+
+ public Set<String> getApplicationColumnFamilies()
+ {
+ Set<String> set = new HashSet<String>();
+ for (String cfName : getColumnFamilies())
+ {
+ if (DatabaseDescriptor.isApplicationColumnFamily(cfName))
+ {
+ set.add(cfName);
+ }
+ }
+ return set;
+ }
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java?rev=759001&r1=759000&r2=759001&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java Fri Mar 27 02:44:57 2009
@@ -817,6 +817,71 @@
return result;
}
+ public List<String> get_range(String tablename, final String startkey) throws CassandraException
+ {
+ logger_.debug("get_range");
+
+ // send request
+ Message message;
+ DataOutputBuffer dob = new DataOutputBuffer();
+ try
+ {
+ dob.writeUTF(startkey);
+ }
+ catch (IOException e)
+ {
+ logger_.error("unable to write startkey", e);
+ throw new RuntimeException(e);
+ }
+ byte[] messageBody = Arrays.copyOf(dob.getData(), dob.getLength());
+ message = new Message(StorageService.getLocalStorageEndPoint(),
+ StorageService.readStage_,
+ StorageService.rangeVerbHandler_,
+ messageBody);
+ EndPoint endPoint;
+ try
+ {
+ endPoint = StorageService.instance().findSuitableEndPoint(startkey);
+ }
+ catch (Exception e)
+ {
+ throw new CassandraException("Unable to find endpoint for " + startkey);
+ }
+ IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
+
+ // read response
+ // TODO send more requests if we need to span multiple nodes (or can we just let client worry about that,
+ // since they have to handle multiple requests anyway?)
+ byte[] responseBody;
+ try
+ {
+ responseBody = (byte[]) iar.get(2 * DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS)[0];
+ }
+ catch (TimeoutException e)
+ {
+ throw new RuntimeException(e);
+ }
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(responseBody, responseBody.length);
+
+ // turn into List
+ List<String> keys = new ArrayList<String>();
+ while (bufIn.getPosition() < responseBody.length)
+ {
+ try
+ {
+ keys.add(bufIn.readUTF());
+ }
+ catch (IOException e)
+ {
+ logger_.error("bad utf", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ return keys;
+ }
+
/*
* This method is used to ensure that all keys
* prior to the specified key, as dtermined by
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/RangeVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/RangeVerbHandler.java?rev=759001&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/RangeVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/RangeVerbHandler.java Fri Mar 27 02:44:57 2009
@@ -0,0 +1,152 @@
+package org.apache.cassandra.service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
+import java.util.Comparator;
+import java.util.Arrays;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.collections.Predicate;
+
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.db.IdentityFilter;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.FileStruct;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.MemtableManager;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+public class RangeVerbHandler implements IVerbHandler
+{
+ public static final Comparator<String> STRING_COMPARATOR = new Comparator<String>()
+ {
+ public int compare(String o1, String o2)
+ {
+ return o1.compareTo(o2);
+ }
+ };
+
+ public void doVerb(Message message)
+ {
+ byte[] bytes = (byte[]) message.getMessageBody()[0];
+ final String startkey;
+ if (bytes.length == 0)
+ {
+ startkey = "";
+ }
+ else
+ {
+ DataInputBuffer dib = new DataInputBuffer();
+ dib.reset(bytes, bytes.length);
+ try
+ {
+ startkey = dib.readUTF();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ List<Iterator<String>> iterators = new ArrayList<Iterator<String>>();
+ Table table = Table.open(DatabaseDescriptor.getTableName());
+ for (String cfName : table.getApplicationColumnFamilies())
+ {
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+ // memtable keys: current and historical
+ Iterator<Memtable> it = (Iterator<Memtable>) IteratorUtils.chainedIterator(
+ IteratorUtils.singletonIterator(cfs.getMemtable()),
+ MemtableManager.instance().getUnflushedMemtables(cfName).iterator());
+ while (it.hasNext())
+ {
+ iterators.add(IteratorUtils.filteredIterator(it.next().sortedKeyIterator(), new Predicate()
+ {
+ public boolean evaluate(Object key)
+ {
+ return ((String) key).compareTo(startkey) >= 0;
+ }
+ }));
+ }
+
+ // sstables
+ for (String filename : cfs.getSSTableFilenames())
+ {
+ try
+ {
+ FileStruct fs = new FileStruct(SequenceFile.reader(filename));
+ fs.seekTo(startkey);
+ iterators.add(fs.iterator());
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ Iterator<String> iter = IteratorUtils.collatedIterator(STRING_COMPARATOR, iterators);
+ List<String> keys = new ArrayList<String>();
+ String last = null, current = null;
+
+ while (keys.size() < 1000)
+ {
+ if (!iter.hasNext())
+ {
+ break;
+ }
+ current = iter.next();
+ if (!current.equals(last))
+ {
+ last = current;
+ for (String cfName : table.getApplicationColumnFamilies())
+ {
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+ try
+ {
+ ColumnFamily cf = cfs.getColumnFamily(current, cfName, new IdentityFilter());
+ if (cf != null && cf.getColumns().size() > 0)
+ {
+ keys.add(current);
+ break;
+ }
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException();
+ }
+ }
+ }
+ }
+
+ DataOutputBuffer dob = new DataOutputBuffer();
+ for (String key : keys)
+ {
+ try
+ {
+ dob.writeUTF(key);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ byte[] data = Arrays.copyOf(dob.getData(), dob.getLength());
+ Message response = message.getReply(StorageService.getLocalStorageEndPoint(), data);
+ MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
+ }
+}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java?rev=759001&r1=759000&r2=759001&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java Fri Mar 27 02:44:57 2009
@@ -138,6 +138,7 @@
public final static String bsMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
public final static String calloutDeployVerbHandler_ = "CALLOUT-DEPLOY-VERB-HANDLER";
public final static String touchVerbHandler_ = "TOUCH-VERB-HANDLER";
+ public static String rangeVerbHandler_ = "RANGE-VERB-HANDLER";
public static enum ConsistencyLevel
{
Added: incubator/cassandra/trunk/src/org/apache/cassandra/utils/DestructivePQIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/utils/DestructivePQIterator.java?rev=759001&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/utils/DestructivePQIterator.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/utils/DestructivePQIterator.java Fri Mar 27 02:44:57 2009
@@ -0,0 +1,25 @@
+package org.apache.cassandra.utils;
+
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+public class DestructivePQIterator<T> implements Iterator<T> {
+ private PriorityQueue<T> pq;
+
+ public DestructivePQIterator(PriorityQueue<T> pq) {
+ this.pq = pq;
+ }
+
+ public boolean hasNext() {
+ return pq.size() > 0;
+ }
+
+ public T next() {
+ return pq.poll();
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+}
+
Modified: incubator/cassandra/trunk/test/org/apache/cassandra/service/CassandraServerTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/service/CassandraServerTest.java?rev=759001&r1=759000&r2=759001&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/org/apache/cassandra/service/CassandraServerTest.java (original)
+++ incubator/cassandra/trunk/test/org/apache/cassandra/service/CassandraServerTest.java Fri Mar 27 02:44:57 2009
@@ -9,8 +9,50 @@
import java.io.IOException;
import java.util.*;
+import com.facebook.thrift.TException;
+
public class CassandraServerTest extends ServerTest {
/*
+ TODO fix resetting server so this works
+ @Test
+ public void test_get_range_empty() throws IOException, TException {
+ CassandraServer server = new CassandraServer();
+ server.start();
+
+ assert CollectionUtils.EMPTY_COLLECTION.equals(server.get_range(DatabaseDescriptor.getTableName(), ""));
+ }
+ */
+
+ /*
+ @Test
+ public void test_get_range() throws IOException, TException, CassandraException
+ {
+ CassandraServer server = new CassandraServer();
+ try
+ {
+ server.start();
+ }
+ catch (Throwable throwable)
+ {
+ throw new RuntimeException(throwable);
+ }
+
+ // TODO insert some data
+ try {
+ String last = null;
+ for (String key : server.get_range(DatabaseDescriptor.getTableName(), "key1")) {
+ if (last != null) {
+ assert last.compareTo(key) < 0;
+ }
+ last = key;
+ }
+ } finally {
+ server.shutdown();
+ }
+ }
+ */
+
+ /*
@Test
public void test_get_column() throws Throwable {
CassandraServer server = new CassandraServer();