You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/04/29 20:57:35 UTC

svn commit: r769874 - in /incubator/cassandra/trunk: interface/ interface/gen-java/org/apache/cassandra/service/ src/org/apache/cassandra/config/ src/org/apache/cassandra/db/ src/org/apache/cassandra/io/ src/org/apache/cassandra/service/ src/org/apache...

Author: jbellis
Date: Wed Apr 29 18:57:35 2009
New Revision: 769874

URL: http://svn.apache.org/viewvc?rev=769874&view=rev
Log:
Add range query support, which requires using an OrderPreservingPartitioner.  (Keys are returned in the order defined by the partitioner collation.)  The fundamental approach (in table.getKeyRange) is simple: create a CollatedIterator that will return unique keys from different sources, each of which is already sorted.  Then we just need Iterators for different key sources.  For SSTables, this means adding seekTo and an Iterator interface to FileStruct.  For Memtable, this means adding a DestructivePQIterator since unlike SSTable keys those are not inherently ordered already.  This means that we only do M log N work sorting the memtable keys where M is the number of keys we actually read, and N is the total number of keys, where a naive sort-everything-first iterator would be N log N.

This does not yet implement range queries spanning multiple nodes.

patch by jbellis; reviewed by Jun Rao for CASSANDRA-71

Added:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/RangeCommand.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/RangeReply.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/RangeVerbHandler.java
    incubator/cassandra/trunk/src/org/apache/cassandra/utils/DestructivePQIterator.java
Modified:
    incubator/cassandra/trunk/interface/cassandra.thrift
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
    incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/org/apache/cassandra/io/DataInputBuffer.java
    incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/test/conf/storage-conf.xml
    incubator/cassandra/trunk/test/system/test_server.py

Modified: incubator/cassandra/trunk/interface/cassandra.thrift
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/cassandra.thrift?rev=769874&r1=769873&r2=769874&view=diff
==============================================================================
--- incubator/cassandra/trunk/interface/cassandra.thrift (original)
+++ incubator/cassandra/trunk/interface/cassandra.thrift Wed Apr 29 18:57:35 2009
@@ -114,6 +114,9 @@
 
   async void     touch(1:string key, 2:bool fData),
 
+  # range query: returns matching keys
+  list<string>   get_key_range(1:string tablename, 2:string startWith="", 3:string stopAt="", 4:i32 maxResults=1000) throws (1: InvalidRequestException ire),
+
   /////////////////////////////////////////////////////////////////////////////////////
   // The following are beta APIs being introduced for CLI and/or CQL support.        //
   // These are still experimental, and subject to change.                            //

Modified: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java?rev=769874&r1=769873&r2=769874&view=diff
==============================================================================
--- incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java (original)
+++ incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java Wed Apr 29 18:57:35 2009
@@ -54,6 +54,8 @@
 
     public void touch(String key, boolean fData) throws TException;
 
+    public List<String> get_key_range(String tablename, String startWith, String stopAt, int maxResults) throws InvalidRequestException, TException;
+
     public String getStringProperty(String propertyName) throws TException;
 
     public List<String> getStringListProperty(String propertyName) throws TException;
@@ -634,6 +636,45 @@
       oprot_.getTransport().flush();
     }
 
+    public List<String> get_key_range(String tablename, String startWith, String stopAt, int maxResults) throws InvalidRequestException, TException
+    {
+      send_get_key_range(tablename, startWith, stopAt, maxResults);
+      return recv_get_key_range();
+    }
+
+    public void send_get_key_range(String tablename, String startWith, String stopAt, int maxResults) throws TException
+    {
+      oprot_.writeMessageBegin(new TMessage("get_key_range", TMessageType.CALL, seqid_));
+      get_key_range_args args = new get_key_range_args();
+      args.tablename = tablename;
+      args.startWith = startWith;
+      args.stopAt = stopAt;
+      args.maxResults = maxResults;
+      args.write(oprot_);
+      oprot_.writeMessageEnd();
+      oprot_.getTransport().flush();
+    }
+
+    public List<String> recv_get_key_range() throws InvalidRequestException, TException
+    {
+      TMessage msg = iprot_.readMessageBegin();
+      if (msg.type == TMessageType.EXCEPTION) {
+        TApplicationException x = TApplicationException.read(iprot_);
+        iprot_.readMessageEnd();
+        throw x;
+      }
+      get_key_range_result result = new get_key_range_result();
+      result.read(iprot_);
+      iprot_.readMessageEnd();
+      if (result.isSetSuccess()) {
+        return result.success;
+      }
+      if (result.ire != null) {
+        throw result.ire;
+      }
+      throw new TApplicationException(TApplicationException.MISSING_RESULT, "get_key_range failed: unknown result");
+    }
+
     public String getStringProperty(String propertyName) throws TException
     {
       send_getStringProperty(propertyName);
@@ -788,6 +829,7 @@
       processMap_.put("batch_insert_superColumn", new batch_insert_superColumn());
       processMap_.put("batch_insert_superColumn_blocking", new batch_insert_superColumn_blocking());
       processMap_.put("touch", new touch());
+      processMap_.put("get_key_range", new get_key_range());
       processMap_.put("getStringProperty", new getStringProperty());
       processMap_.put("getStringListProperty", new getStringListProperty());
       processMap_.put("describeTable", new describeTable());
@@ -1214,6 +1256,34 @@
       }
     }
 
+    private class get_key_range implements ProcessFunction {
+      public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException
+      {
+        get_key_range_args args = new get_key_range_args();
+        args.read(iprot);
+        iprot.readMessageEnd();
+        get_key_range_result result = new get_key_range_result();
+        try {
+          result.success = iface_.get_key_range(args.tablename, args.startWith, args.stopAt, args.maxResults);
+        } catch (InvalidRequestException ire) {
+          result.ire = ire;
+        } catch (Throwable th) {
+          LOGGER.error("Internal error processing get_key_range", th);
+          TApplicationException x = new TApplicationException(TApplicationException.INTERNAL_ERROR, "Internal error processing get_key_range");
+          oprot.writeMessageBegin(new TMessage("get_key_range", TMessageType.EXCEPTION, seqid));
+          x.write(oprot);
+          oprot.writeMessageEnd();
+          oprot.getTransport().flush();
+          return;
+        }
+        oprot.writeMessageBegin(new TMessage("get_key_range", TMessageType.REPLY, seqid));
+        result.write(oprot);
+        oprot.writeMessageEnd();
+        oprot.getTransport().flush();
+      }
+
+    }
+
     private class getStringProperty implements ProcessFunction {
       public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException
       {
@@ -11033,80 +11103,209 @@
 
   }
 
-  public static class getStringProperty_args implements TBase, java.io.Serializable, Cloneable   {
-    private static final TStruct STRUCT_DESC = new TStruct("getStringProperty_args");
-    private static final TField PROPERTY_NAME_FIELD_DESC = new TField("propertyName", TType.STRING, (short)-1);
+  public static class get_key_range_args implements TBase, java.io.Serializable, Cloneable   {
+    private static final TStruct STRUCT_DESC = new TStruct("get_key_range_args");
+    private static final TField TABLENAME_FIELD_DESC = new TField("tablename", TType.STRING, (short)1);
+    private static final TField START_WITH_FIELD_DESC = new TField("startWith", TType.STRING, (short)2);
+    private static final TField STOP_AT_FIELD_DESC = new TField("stopAt", TType.STRING, (short)3);
+    private static final TField MAX_RESULTS_FIELD_DESC = new TField("maxResults", TType.I32, (short)4);
 
-    public String propertyName;
-    public static final int PROPERTYNAME = -1;
+    public String tablename;
+    public static final int TABLENAME = 1;
+    public String startWith;
+    public static final int STARTWITH = 2;
+    public String stopAt;
+    public static final int STOPAT = 3;
+    public int maxResults;
+    public static final int MAXRESULTS = 4;
 
     private final Isset __isset = new Isset();
     private static final class Isset implements java.io.Serializable {
+      public boolean maxResults = false;
     }
 
     public static final Map<Integer, FieldMetaData> metaDataMap = Collections.unmodifiableMap(new HashMap<Integer, FieldMetaData>() {{
-      put(PROPERTYNAME, new FieldMetaData("propertyName", TFieldRequirementType.DEFAULT, 
+      put(TABLENAME, new FieldMetaData("tablename", TFieldRequirementType.DEFAULT, 
+          new FieldValueMetaData(TType.STRING)));
+      put(STARTWITH, new FieldMetaData("startWith", TFieldRequirementType.DEFAULT, 
+          new FieldValueMetaData(TType.STRING)));
+      put(STOPAT, new FieldMetaData("stopAt", TFieldRequirementType.DEFAULT, 
           new FieldValueMetaData(TType.STRING)));
+      put(MAXRESULTS, new FieldMetaData("maxResults", TFieldRequirementType.DEFAULT, 
+          new FieldValueMetaData(TType.I32)));
     }});
 
     static {
-      FieldMetaData.addStructMetaDataMap(getStringProperty_args.class, metaDataMap);
+      FieldMetaData.addStructMetaDataMap(get_key_range_args.class, metaDataMap);
     }
 
-    public getStringProperty_args() {
+    public get_key_range_args() {
+      this.startWith = "";
+
+      this.stopAt = "";
+
+      this.maxResults = 1000;
+
     }
 
-    public getStringProperty_args(
-      String propertyName)
+    public get_key_range_args(
+      String tablename,
+      String startWith,
+      String stopAt,
+      int maxResults)
     {
       this();
-      this.propertyName = propertyName;
+      this.tablename = tablename;
+      this.startWith = startWith;
+      this.stopAt = stopAt;
+      this.maxResults = maxResults;
+      this.__isset.maxResults = true;
     }
 
     /**
      * Performs a deep copy on <i>other</i>.
      */
-    public getStringProperty_args(getStringProperty_args other) {
-      if (other.isSetPropertyName()) {
-        this.propertyName = other.propertyName;
+    public get_key_range_args(get_key_range_args other) {
+      if (other.isSetTablename()) {
+        this.tablename = other.tablename;
       }
+      if (other.isSetStartWith()) {
+        this.startWith = other.startWith;
+      }
+      if (other.isSetStopAt()) {
+        this.stopAt = other.stopAt;
+      }
+      __isset.maxResults = other.__isset.maxResults;
+      this.maxResults = other.maxResults;
     }
 
     @Override
-    public getStringProperty_args clone() {
-      return new getStringProperty_args(this);
+    public get_key_range_args clone() {
+      return new get_key_range_args(this);
     }
 
-    public String getPropertyName() {
-      return this.propertyName;
+    public String getTablename() {
+      return this.tablename;
     }
 
-    public void setPropertyName(String propertyName) {
-      this.propertyName = propertyName;
+    public void setTablename(String tablename) {
+      this.tablename = tablename;
     }
 
-    public void unsetPropertyName() {
-      this.propertyName = null;
+    public void unsetTablename() {
+      this.tablename = null;
     }
 
-    // Returns true if field propertyName is set (has been asigned a value) and false otherwise
-    public boolean isSetPropertyName() {
-      return this.propertyName != null;
+    // Returns true if field tablename is set (has been asigned a value) and false otherwise
+    public boolean isSetTablename() {
+      return this.tablename != null;
     }
 
-    public void setPropertyNameIsSet(boolean value) {
+    public void setTablenameIsSet(boolean value) {
       if (!value) {
-        this.propertyName = null;
+        this.tablename = null;
+      }
+    }
+
+    public String getStartWith() {
+      return this.startWith;
+    }
+
+    public void setStartWith(String startWith) {
+      this.startWith = startWith;
+    }
+
+    public void unsetStartWith() {
+      this.startWith = null;
+    }
+
+    // Returns true if field startWith is set (has been asigned a value) and false otherwise
+    public boolean isSetStartWith() {
+      return this.startWith != null;
+    }
+
+    public void setStartWithIsSet(boolean value) {
+      if (!value) {
+        this.startWith = null;
+      }
+    }
+
+    public String getStopAt() {
+      return this.stopAt;
+    }
+
+    public void setStopAt(String stopAt) {
+      this.stopAt = stopAt;
+    }
+
+    public void unsetStopAt() {
+      this.stopAt = null;
+    }
+
+    // Returns true if field stopAt is set (has been asigned a value) and false otherwise
+    public boolean isSetStopAt() {
+      return this.stopAt != null;
+    }
+
+    public void setStopAtIsSet(boolean value) {
+      if (!value) {
+        this.stopAt = null;
       }
     }
 
+    public int getMaxResults() {
+      return this.maxResults;
+    }
+
+    public void setMaxResults(int maxResults) {
+      this.maxResults = maxResults;
+      this.__isset.maxResults = true;
+    }
+
+    public void unsetMaxResults() {
+      this.__isset.maxResults = false;
+    }
+
+    // Returns true if field maxResults is set (has been asigned a value) and false otherwise
+    public boolean isSetMaxResults() {
+      return this.__isset.maxResults;
+    }
+
+    public void setMaxResultsIsSet(boolean value) {
+      this.__isset.maxResults = value;
+    }
+
     public void setFieldValue(int fieldID, Object value) {
       switch (fieldID) {
-      case PROPERTYNAME:
+      case TABLENAME:
         if (value == null) {
-          unsetPropertyName();
+          unsetTablename();
         } else {
-          setPropertyName((String)value);
+          setTablename((String)value);
+        }
+        break;
+
+      case STARTWITH:
+        if (value == null) {
+          unsetStartWith();
+        } else {
+          setStartWith((String)value);
+        }
+        break;
+
+      case STOPAT:
+        if (value == null) {
+          unsetStopAt();
+        } else {
+          setStopAt((String)value);
+        }
+        break;
+
+      case MAXRESULTS:
+        if (value == null) {
+          unsetMaxResults();
+        } else {
+          setMaxResults((Integer)value);
         }
         break;
 
@@ -11117,8 +11316,17 @@
 
     public Object getFieldValue(int fieldID) {
       switch (fieldID) {
-      case PROPERTYNAME:
-        return getPropertyName();
+      case TABLENAME:
+        return getTablename();
+
+      case STARTWITH:
+        return getStartWith();
+
+      case STOPAT:
+        return getStopAt();
+
+      case MAXRESULTS:
+        return new Integer(getMaxResults());
 
       default:
         throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!");
@@ -11128,8 +11336,14 @@
     // Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise
     public boolean isSet(int fieldID) {
       switch (fieldID) {
-      case PROPERTYNAME:
-        return isSetPropertyName();
+      case TABLENAME:
+        return isSetTablename();
+      case STARTWITH:
+        return isSetStartWith();
+      case STOPAT:
+        return isSetStopAt();
+      case MAXRESULTS:
+        return isSetMaxResults();
       default:
         throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!");
       }
@@ -11139,21 +11353,48 @@
     public boolean equals(Object that) {
       if (that == null)
         return false;
-      if (that instanceof getStringProperty_args)
-        return this.equals((getStringProperty_args)that);
+      if (that instanceof get_key_range_args)
+        return this.equals((get_key_range_args)that);
       return false;
     }
 
-    public boolean equals(getStringProperty_args that) {
+    public boolean equals(get_key_range_args that) {
       if (that == null)
         return false;
 
-      boolean this_present_propertyName = true && this.isSetPropertyName();
-      boolean that_present_propertyName = true && that.isSetPropertyName();
-      if (this_present_propertyName || that_present_propertyName) {
-        if (!(this_present_propertyName && that_present_propertyName))
+      boolean this_present_tablename = true && this.isSetTablename();
+      boolean that_present_tablename = true && that.isSetTablename();
+      if (this_present_tablename || that_present_tablename) {
+        if (!(this_present_tablename && that_present_tablename))
           return false;
-        if (!this.propertyName.equals(that.propertyName))
+        if (!this.tablename.equals(that.tablename))
+          return false;
+      }
+
+      boolean this_present_startWith = true && this.isSetStartWith();
+      boolean that_present_startWith = true && that.isSetStartWith();
+      if (this_present_startWith || that_present_startWith) {
+        if (!(this_present_startWith && that_present_startWith))
+          return false;
+        if (!this.startWith.equals(that.startWith))
+          return false;
+      }
+
+      boolean this_present_stopAt = true && this.isSetStopAt();
+      boolean that_present_stopAt = true && that.isSetStopAt();
+      if (this_present_stopAt || that_present_stopAt) {
+        if (!(this_present_stopAt && that_present_stopAt))
+          return false;
+        if (!this.stopAt.equals(that.stopAt))
+          return false;
+      }
+
+      boolean this_present_maxResults = true;
+      boolean that_present_maxResults = true;
+      if (this_present_maxResults || that_present_maxResults) {
+        if (!(this_present_maxResults && that_present_maxResults))
+          return false;
+        if (this.maxResults != that.maxResults)
           return false;
       }
 
@@ -11176,9 +11417,31 @@
         }
         switch (field.id)
         {
-          case PROPERTYNAME:
+          case TABLENAME:
             if (field.type == TType.STRING) {
-              this.propertyName = iprot.readString();
+              this.tablename = iprot.readString();
+            } else { 
+              TProtocolUtil.skip(iprot, field.type);
+            }
+            break;
+          case STARTWITH:
+            if (field.type == TType.STRING) {
+              this.startWith = iprot.readString();
+            } else { 
+              TProtocolUtil.skip(iprot, field.type);
+            }
+            break;
+          case STOPAT:
+            if (field.type == TType.STRING) {
+              this.stopAt = iprot.readString();
+            } else { 
+              TProtocolUtil.skip(iprot, field.type);
+            }
+            break;
+          case MAXRESULTS:
+            if (field.type == TType.I32) {
+              this.maxResults = iprot.readI32();
+              this.__isset.maxResults = true;
             } else { 
               TProtocolUtil.skip(iprot, field.type);
             }
@@ -11200,27 +11463,60 @@
       validate();
 
       oprot.writeStructBegin(STRUCT_DESC);
-      if (this.propertyName != null) {
-        oprot.writeFieldBegin(PROPERTY_NAME_FIELD_DESC);
-        oprot.writeString(this.propertyName);
+      if (this.tablename != null) {
+        oprot.writeFieldBegin(TABLENAME_FIELD_DESC);
+        oprot.writeString(this.tablename);
+        oprot.writeFieldEnd();
+      }
+      if (this.startWith != null) {
+        oprot.writeFieldBegin(START_WITH_FIELD_DESC);
+        oprot.writeString(this.startWith);
         oprot.writeFieldEnd();
       }
+      if (this.stopAt != null) {
+        oprot.writeFieldBegin(STOP_AT_FIELD_DESC);
+        oprot.writeString(this.stopAt);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldBegin(MAX_RESULTS_FIELD_DESC);
+      oprot.writeI32(this.maxResults);
+      oprot.writeFieldEnd();
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
 
     @Override
     public String toString() {
-      StringBuilder sb = new StringBuilder("getStringProperty_args(");
+      StringBuilder sb = new StringBuilder("get_key_range_args(");
       boolean first = true;
 
-      sb.append("propertyName:");
-      if (this.propertyName == null) {
+      sb.append("tablename:");
+      if (this.tablename == null) {
         sb.append("null");
       } else {
-        sb.append(this.propertyName);
+        sb.append(this.tablename);
+      }
+      first = false;
+      if (!first) sb.append(", ");
+      sb.append("startWith:");
+      if (this.startWith == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.startWith);
+      }
+      first = false;
+      if (!first) sb.append(", ");
+      sb.append("stopAt:");
+      if (this.stopAt == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.stopAt);
       }
       first = false;
+      if (!first) sb.append(", ");
+      sb.append("maxResults:");
+      sb.append(this.maxResults);
+      first = false;
       sb.append(")");
       return sb.toString();
     }
@@ -11232,12 +11528,15 @@
 
   }
 
-  public static class getStringProperty_result implements TBase, java.io.Serializable, Cloneable   {
-    private static final TStruct STRUCT_DESC = new TStruct("getStringProperty_result");
-    private static final TField SUCCESS_FIELD_DESC = new TField("success", TType.STRING, (short)0);
+  public static class get_key_range_result implements TBase, java.io.Serializable, Cloneable   {
+    private static final TStruct STRUCT_DESC = new TStruct("get_key_range_result");
+    private static final TField SUCCESS_FIELD_DESC = new TField("success", TType.LIST, (short)0);
+    private static final TField IRE_FIELD_DESC = new TField("ire", TType.STRUCT, (short)1);
 
-    public String success;
+    public List<String> success;
     public static final int SUCCESS = 0;
+    public InvalidRequestException ire;
+    public static final int IRE = 1;
 
     private final Isset __isset = new Isset();
     private static final class Isset implements java.io.Serializable {
@@ -11245,7 +11544,512 @@
 
     public static final Map<Integer, FieldMetaData> metaDataMap = Collections.unmodifiableMap(new HashMap<Integer, FieldMetaData>() {{
       put(SUCCESS, new FieldMetaData("success", TFieldRequirementType.DEFAULT, 
-          new FieldValueMetaData(TType.STRING)));
+          new ListMetaData(TType.LIST, 
+              new FieldValueMetaData(TType.STRING))));
+      put(IRE, new FieldMetaData("ire", TFieldRequirementType.DEFAULT, 
+          new FieldValueMetaData(TType.STRUCT)));
+    }});
+
+    static {
+      FieldMetaData.addStructMetaDataMap(get_key_range_result.class, metaDataMap);
+    }
+
+    public get_key_range_result() {
+    }
+
+    public get_key_range_result(
+      List<String> success,
+      InvalidRequestException ire)
+    {
+      this();
+      this.success = success;
+      this.ire = ire;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public get_key_range_result(get_key_range_result other) {
+      if (other.isSetSuccess()) {
+        List<String> __this__success = new ArrayList<String>();
+        for (String other_element : other.success) {
+          __this__success.add(other_element);
+        }
+        this.success = __this__success;
+      }
+      if (other.isSetIre()) {
+        this.ire = new InvalidRequestException(other.ire);
+      }
+    }
+
+    @Override
+    public get_key_range_result clone() {
+      return new get_key_range_result(this);
+    }
+
+    public int getSuccessSize() {
+      return (this.success == null) ? 0 : this.success.size();
+    }
+
+    public java.util.Iterator<String> getSuccessIterator() {
+      return (this.success == null) ? null : this.success.iterator();
+    }
+
+    public void addToSuccess(String elem) {
+      if (this.success == null) {
+        this.success = new ArrayList<String>();
+      }
+      this.success.add(elem);
+    }
+
+    public List<String> getSuccess() {
+      return this.success;
+    }
+
+    public void setSuccess(List<String> success) {
+      this.success = success;
+    }
+
+    public void unsetSuccess() {
+      this.success = null;
+    }
+
+    // Returns true if field success is set (has been asigned a value) and false otherwise
+    public boolean isSetSuccess() {
+      return this.success != null;
+    }
+
+    public void setSuccessIsSet(boolean value) {
+      if (!value) {
+        this.success = null;
+      }
+    }
+
+    public InvalidRequestException getIre() {
+      return this.ire;
+    }
+
+    public void setIre(InvalidRequestException ire) {
+      this.ire = ire;
+    }
+
+    public void unsetIre() {
+      this.ire = null;
+    }
+
+    // Returns true if field ire is set (has been asigned a value) and false otherwise
+    public boolean isSetIre() {
+      return this.ire != null;
+    }
+
+    public void setIreIsSet(boolean value) {
+      if (!value) {
+        this.ire = null;
+      }
+    }
+
+    public void setFieldValue(int fieldID, Object value) {
+      switch (fieldID) {
+      case SUCCESS:
+        if (value == null) {
+          unsetSuccess();
+        } else {
+          setSuccess((List<String>)value);
+        }
+        break;
+
+      case IRE:
+        if (value == null) {
+          unsetIre();
+        } else {
+          setIre((InvalidRequestException)value);
+        }
+        break;
+
+      default:
+        throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!");
+      }
+    }
+
+    public Object getFieldValue(int fieldID) {
+      switch (fieldID) {
+      case SUCCESS:
+        return getSuccess();
+
+      case IRE:
+        return getIre();
+
+      default:
+        throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!");
+      }
+    }
+
+    // Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise
+    public boolean isSet(int fieldID) {
+      switch (fieldID) {
+      case SUCCESS:
+        return isSetSuccess();
+      case IRE:
+        return isSetIre();
+      default:
+        throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!");
+      }
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof get_key_range_result)
+        return this.equals((get_key_range_result)that);
+      return false;
+    }
+
+    public boolean equals(get_key_range_result that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_success = true && this.isSetSuccess();
+      boolean that_present_success = true && that.isSetSuccess();
+      if (this_present_success || that_present_success) {
+        if (!(this_present_success && that_present_success))
+          return false;
+        if (!this.success.equals(that.success))
+          return false;
+      }
+
+      boolean this_present_ire = true && this.isSetIre();
+      boolean that_present_ire = true && that.isSetIre();
+      if (this_present_ire || that_present_ire) {
+        if (!(this_present_ire && that_present_ire))
+          return false;
+        if (!this.ire.equals(that.ire))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public void read(TProtocol iprot) throws TException {
+      TField field;
+      iprot.readStructBegin();
+      while (true)
+      {
+        field = iprot.readFieldBegin();
+        if (field.type == TType.STOP) { 
+          break;
+        }
+        switch (field.id)
+        {
+          case SUCCESS:
+            if (field.type == TType.LIST) {
+              {
+                TList _list59 = iprot.readListBegin();
+                this.success = new ArrayList<String>(_list59.size);
+                for (int _i60 = 0; _i60 < _list59.size; ++_i60)
+                {
+                  String _elem61;
+                  _elem61 = iprot.readString();
+                  this.success.add(_elem61);
+                }
+                iprot.readListEnd();
+              }
+            } else { 
+              TProtocolUtil.skip(iprot, field.type);
+            }
+            break;
+          case IRE:
+            if (field.type == TType.STRUCT) {
+              this.ire = new InvalidRequestException();
+              this.ire.read(iprot);
+            } else { 
+              TProtocolUtil.skip(iprot, field.type);
+            }
+            break;
+          default:
+            TProtocolUtil.skip(iprot, field.type);
+            break;
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      validate();
+    }
+
+    public void write(TProtocol oprot) throws TException {
+      oprot.writeStructBegin(STRUCT_DESC);
+
+      if (this.isSetSuccess()) {
+        oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
+        {
+          oprot.writeListBegin(new TList(TType.STRING, this.success.size()));
+          for (String _iter62 : this.success)          {
+            oprot.writeString(_iter62);
+          }
+          oprot.writeListEnd();
+        }
+        oprot.writeFieldEnd();
+      } else if (this.isSetIre()) {
+        oprot.writeFieldBegin(IRE_FIELD_DESC);
+        this.ire.write(oprot);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("get_key_range_result(");
+      boolean first = true;
+
+      sb.append("success:");
+      if (this.success == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.success);
+      }
+      first = false;
+      if (!first) sb.append(", ");
+      sb.append("ire:");
+      if (this.ire == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.ire);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws TException {
+      // check for required fields
+      // check that fields of type enum have valid values
+    }
+
+  }
+
+  public static class getStringProperty_args implements TBase, java.io.Serializable, Cloneable   {
+    private static final TStruct STRUCT_DESC = new TStruct("getStringProperty_args");
+    private static final TField PROPERTY_NAME_FIELD_DESC = new TField("propertyName", TType.STRING, (short)-1);
+
+    public String propertyName;
+    public static final int PROPERTYNAME = -1;
+
+    private final Isset __isset = new Isset();
+    private static final class Isset implements java.io.Serializable {
+    }
+
+    public static final Map<Integer, FieldMetaData> metaDataMap = Collections.unmodifiableMap(new HashMap<Integer, FieldMetaData>() {{
+      put(PROPERTYNAME, new FieldMetaData("propertyName", TFieldRequirementType.DEFAULT, 
+          new FieldValueMetaData(TType.STRING)));
+    }});
+
+    static {
+      FieldMetaData.addStructMetaDataMap(getStringProperty_args.class, metaDataMap);
+    }
+
+    public getStringProperty_args() {
+    }
+
+    public getStringProperty_args(
+      String propertyName)
+    {
+      this();
+      this.propertyName = propertyName;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public getStringProperty_args(getStringProperty_args other) {
+      if (other.isSetPropertyName()) {
+        this.propertyName = other.propertyName;
+      }
+    }
+
+    @Override
+    public getStringProperty_args clone() {
+      return new getStringProperty_args(this);
+    }
+
+    public String getPropertyName() {
+      return this.propertyName;
+    }
+
+    public void setPropertyName(String propertyName) {
+      this.propertyName = propertyName;
+    }
+
+    public void unsetPropertyName() {
+      this.propertyName = null;
+    }
+
+    // Returns true if field propertyName is set (has been asigned a value) and false otherwise
+    public boolean isSetPropertyName() {
+      return this.propertyName != null;
+    }
+
+    public void setPropertyNameIsSet(boolean value) {
+      if (!value) {
+        this.propertyName = null;
+      }
+    }
+
+    public void setFieldValue(int fieldID, Object value) {
+      switch (fieldID) {
+      case PROPERTYNAME:
+        if (value == null) {
+          unsetPropertyName();
+        } else {
+          setPropertyName((String)value);
+        }
+        break;
+
+      default:
+        throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!");
+      }
+    }
+
+    public Object getFieldValue(int fieldID) {
+      switch (fieldID) {
+      case PROPERTYNAME:
+        return getPropertyName();
+
+      default:
+        throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!");
+      }
+    }
+
+    // Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise
+    public boolean isSet(int fieldID) {
+      switch (fieldID) {
+      case PROPERTYNAME:
+        return isSetPropertyName();
+      default:
+        throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!");
+      }
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof getStringProperty_args)
+        return this.equals((getStringProperty_args)that);
+      return false;
+    }
+
+    public boolean equals(getStringProperty_args that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_propertyName = true && this.isSetPropertyName();
+      boolean that_present_propertyName = true && that.isSetPropertyName();
+      if (this_present_propertyName || that_present_propertyName) {
+        if (!(this_present_propertyName && that_present_propertyName))
+          return false;
+        if (!this.propertyName.equals(that.propertyName))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public void read(TProtocol iprot) throws TException {
+      TField field;
+      iprot.readStructBegin();
+      while (true)
+      {
+        field = iprot.readFieldBegin();
+        if (field.type == TType.STOP) { 
+          break;
+        }
+        switch (field.id)
+        {
+          case PROPERTYNAME:
+            if (field.type == TType.STRING) {
+              this.propertyName = iprot.readString();
+            } else { 
+              TProtocolUtil.skip(iprot, field.type);
+            }
+            break;
+          default:
+            TProtocolUtil.skip(iprot, field.type);
+            break;
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      validate();
+    }
+
+    public void write(TProtocol oprot) throws TException {
+      validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (this.propertyName != null) {
+        oprot.writeFieldBegin(PROPERTY_NAME_FIELD_DESC);
+        oprot.writeString(this.propertyName);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("getStringProperty_args(");
+      boolean first = true;
+
+      sb.append("propertyName:");
+      if (this.propertyName == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.propertyName);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws TException {
+      // check for required fields
+      // check that fields of type enum have valid values
+    }
+
+  }
+
+  public static class getStringProperty_result implements TBase, java.io.Serializable, Cloneable   {
+    private static final TStruct STRUCT_DESC = new TStruct("getStringProperty_result");
+    private static final TField SUCCESS_FIELD_DESC = new TField("success", TType.STRING, (short)0);
+
+    public String success;
+    public static final int SUCCESS = 0;
+
+    private final Isset __isset = new Isset();
+    private static final class Isset implements java.io.Serializable {
+    }
+
+    public static final Map<Integer, FieldMetaData> metaDataMap = Collections.unmodifiableMap(new HashMap<Integer, FieldMetaData>() {{
+      put(SUCCESS, new FieldMetaData("success", TFieldRequirementType.DEFAULT, 
+          new FieldValueMetaData(TType.STRING)));
     }});
 
     static {
@@ -11795,13 +12599,13 @@
           case SUCCESS:
             if (field.type == TType.LIST) {
               {
-                TList _list59 = iprot.readListBegin();
-                this.success = new ArrayList<String>(_list59.size);
-                for (int _i60 = 0; _i60 < _list59.size; ++_i60)
+                TList _list63 = iprot.readListBegin();
+                this.success = new ArrayList<String>(_list63.size);
+                for (int _i64 = 0; _i64 < _list63.size; ++_i64)
                 {
-                  String _elem61;
-                  _elem61 = iprot.readString();
-                  this.success.add(_elem61);
+                  String _elem65;
+                  _elem65 = iprot.readString();
+                  this.success.add(_elem65);
                 }
                 iprot.readListEnd();
               }
@@ -11829,8 +12633,8 @@
         oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
         {
           oprot.writeListBegin(new TList(TType.STRING, this.success.size()));
-          for (String _iter62 : this.success)          {
-            oprot.writeString(_iter62);
+          for (String _iter66 : this.success)          {
+            oprot.writeString(_iter66);
           }
           oprot.writeListEnd();
         }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java?rev=769874&r1=769873&r2=769874&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java Wed Apr 29 18:57:35 2009
@@ -87,7 +87,7 @@
     */
     private static Map<String, Map<String, CFMetaData>> tableToCFMetaDataMap_;
     /* Hashing strategy Random or OPHF */
-    private static String hashingStrategy_ = DatabaseDescriptor.random_;
+    private static String partitionerClass_;
     /* if the size of columns or super-columns are more than this, indexing will kick in */
     private static int columnIndexSizeInKB_;
     /* Size of touch key cache */
@@ -137,7 +137,7 @@
             zkAddress_ = xmlUtils.getNodeValue("/Storage/ZookeeperAddress");
 
             /* Hashing strategy */
-            hashingStrategy_ = xmlUtils.getNodeValue("/Storage/HashingStrategy");
+            partitionerClass_ = xmlUtils.getNodeValue("/Storage/Partitioner");
             /* Callout location */
             calloutLocation_ = xmlUtils.getNodeValue("/Storage/CalloutLocation");
 
@@ -475,9 +475,9 @@
         return gcGraceInSeconds_;
     }
 
-    public static String getHashingStrategy()
+    public static String getPartitionerClass()
     {
-        return hashingStrategy_;
+        return partitionerClass_;
     }
     
     public static String getZkAddress()

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java?rev=769874&r1=769873&r2=769874&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java Wed Apr 29 18:57:35 2009
@@ -290,7 +290,7 @@
         return columns_.getSortedColumns();
     }
 
-    Map<String, IColumn> getColumns()
+    public Map<String, IColumn> getColumns()
     {
         return columns_.getColumns();
     }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=769874&r1=769873&r2=769874&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java Wed Apr 29 18:57:35 2009
@@ -481,6 +481,13 @@
         return resolveAndRemoveDeleted(columnFamilies);
     }
 
+    public ColumnFamily getColumnFamily(String key, String columnFamilyColumn, IFilter filter, int gcBefore) throws IOException
+    {
+        List<ColumnFamily> columnFamilies = getColumnFamilies(key, columnFamilyColumn, filter);
+        ColumnFamily cf = ColumnFamily.resolve(columnFamilies);
+        return removeDeleted(cf, gcBefore);
+    }
+
     /**
      *
      * Get the column family in the most efficient order.
@@ -1437,4 +1444,14 @@
         }
         ssTables_.clear();
     }
+
+    public Object getMemtable()
+    {
+        return memtable_.get();
+    }
+
+    public Set<String> getSSTableFilenames()
+    {
+        return Collections.unmodifiableSet(ssTables_);
+    }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java?rev=769874&r1=769873&r2=769874&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java Wed Apr 29 18:57:35 2009
@@ -19,6 +19,7 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
+import java.util.Iterator;
 
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
@@ -28,7 +29,7 @@
 import org.apache.cassandra.dht.IPartitioner;
 
 
-public class FileStruct implements Comparable<FileStruct>
+public class FileStruct implements Comparable<FileStruct>, Iterator<String>
 {
     private String key = null; // decorated!
     private boolean exhausted = false;
@@ -36,6 +37,7 @@
     private DataInputBuffer bufIn;
     private DataOutputBuffer bufOut;
     private IPartitioner partitioner;
+    private FileStructIterator iterator = new FileStructIterator();
 
     public FileStruct(IFileReader reader, IPartitioner partitioner)
     {
@@ -73,7 +75,7 @@
     public int compareTo(FileStruct f)
     {
         return partitioner.getDecoratedKeyComparator().compare(key, f.key);
-    }
+    }    
 
     public void seekTo(String seekKey)
     {
@@ -110,6 +112,8 @@
      * Read the next key from the data file, skipping block indexes.
      * Caller must check isExhausted after each call to see if further
      * reads are valid.
+     * Do not mix with calls to the iterator interface (next/hasnext).
+     * @deprecated -- prefer the iterator interface.
      */
     public void advance() throws IOException
     {
@@ -144,4 +148,63 @@
         }
     }
 
+    public boolean hasNext()
+    {
+        return iterator.hasNext();
+    }
+
+    /** do not mix with manual calls to advance(). */
+    public String next()
+    {
+        return iterator.next();
+    }
+
+    public void remove()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    private class FileStructIterator
+    {
+        String saved;
+
+        private void forward()
+        {
+            try
+            {
+                advance();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+            saved = isExhausted() ? null : key;
+        }
+
+        private void maybeInit()
+        {
+            if (key == null && !isExhausted())
+            {
+                forward();
+            }
+        }
+
+        public boolean hasNext()
+        {
+            maybeInit();
+            return saved != null;
+        }
+
+        public String next()
+        {
+            maybeInit();
+            if (saved == null)
+            {
+                throw new IndexOutOfBoundsException();
+            }
+            String key = saved;
+            forward();
+            return key;
+        }
+    }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java?rev=769874&r1=769873&r2=769874&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java Wed Apr 29 18:57:35 2009
@@ -25,6 +25,9 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+import java.util.Arrays;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -47,6 +50,7 @@
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.service.StorageService;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
+import org.apache.cassandra.utils.DestructivePQIterator;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -441,4 +445,17 @@
             flushQueuer = new FutureTask(runnable, null);
         }
     }
+
+    public Iterator<String> sortedKeyIterator()
+    {
+        Set<String> keys = columnFamilies_.keySet();
+        if (keys.size() == 0)
+        {
+            // cannot create a PQ of size zero (wtf?)
+            return Arrays.asList(new String[0]).iterator();
+        }
+        PriorityQueue<String> pq = new PriorityQueue<String>(keys.size(), StorageService.getPartitioner().getDecoratedKeyComparator());
+        pq.addAll(keys);
+        return new DestructivePQIterator<String>(pq);
+    }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java?rev=769874&r1=769873&r2=769874&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java Wed Apr 29 18:57:35 2009
@@ -40,7 +40,7 @@
     private static Lock lock_ = new ReentrantLock();
     private static Logger logger_ = Logger.getLogger(MemtableManager.class);
     private ReentrantReadWriteLock rwLock_ = new ReentrantReadWriteLock(true);
-    static MemtableManager instance() 
+    public static MemtableManager instance()
     {
         if ( instance_ == null )
         {
@@ -157,7 +157,22 @@
     	}
     }
 
-
-
+    public List<Memtable> getUnflushedMemtables(String cfName)
+    {
+        rwLock_.readLock().lock();
+        try
+        {
+            List<Memtable> memtables = history_.get(cfName);
+            if (memtables != null)
+            {
+                return new ArrayList<Memtable>(memtables);
+            }
+            return Arrays.asList(new Memtable[0]);
+        }
+        finally
+        {
+            rwLock_.readLock().unlock();
+        }
+    }
 
 }

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/RangeCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RangeCommand.java?rev=769874&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RangeCommand.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RangeCommand.java Wed Apr 29 18:57:35 2009
@@ -0,0 +1,64 @@
+package org.apache.cassandra.db;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.DataInputStream;
+import java.util.Arrays;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+
+public class RangeCommand
+{
+    private static RangeCommandSerializer serializer = new RangeCommandSerializer();
+
+    public final String table;
+    public final String startWith;
+    public final String stopAt;
+    public final int maxResults;
+
+    public RangeCommand(String table, String startWith, String stopAt, int maxResults)
+    {
+        this.table = table;
+        this.startWith = startWith;
+        this.stopAt = stopAt;
+        this.maxResults = maxResults;
+    }
+
+    public Message getMessage() throws IOException
+    {
+        DataOutputBuffer dob = new DataOutputBuffer();
+        serializer.serialize(this, dob);
+        return new Message(StorageService.getLocalStorageEndPoint(),
+                           StorageService.readStage_,
+                           StorageService.rangeVerbHandler_,
+                           Arrays.copyOf(dob.getData(), dob.getLength()));
+    }
+
+    public static RangeCommand read(Message message) throws IOException
+    {
+        byte[] bytes = (byte[]) message.getMessageBody()[0];
+        DataInputBuffer dib = new DataInputBuffer();
+        dib.reset(bytes, bytes.length);
+        return serializer.deserialize(new DataInputStream(dib));
+    }
+}
+
+class RangeCommandSerializer implements ICompactSerializer<RangeCommand>
+{
+    public void serialize(RangeCommand command, DataOutputStream dos) throws IOException
+    {
+        dos.writeUTF(command.table);
+        dos.writeUTF(command.startWith);
+        dos.writeUTF(command.stopAt);
+        dos.writeInt(command.maxResults);
+    }
+
+    public RangeCommand deserialize(DataInputStream dis) throws IOException
+    {
+        return new RangeCommand(dis.readUTF(), dis.readUTF(), dis.readUTF(), dis.readInt());
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/RangeReply.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RangeReply.java?rev=769874&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RangeReply.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RangeReply.java Wed Apr 29 18:57:35 2009
@@ -0,0 +1,55 @@
+package org.apache.cassandra.db;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Arrays;
+import java.io.IOException;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+
+public class RangeReply
+{
+    public final List<String> keys;
+
+    public RangeReply(List<String> keys)
+    {
+        this.keys = Collections.unmodifiableList(keys);
+    }
+
+    public Message getReply(Message originalMessage)
+    {
+        DataOutputBuffer dob = new DataOutputBuffer();
+        for (String key : keys)
+        {
+            try
+            {
+                dob.writeUTF(key);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+        byte[] data = Arrays.copyOf(dob.getData(), dob.getLength());
+        return originalMessage.getReply(StorageService.getLocalStorageEndPoint(), data);
+    }
+
+    public static RangeReply read(byte[] body) throws IOException
+    {
+        DataInputBuffer bufIn = new DataInputBuffer();
+        bufIn.reset(body, body.length);
+
+        List<String> keys = new ArrayList<String>();
+        while (bufIn.getPosition() < body.length)
+        {
+            keys.add(bufIn.readUTF());
+        }
+
+        return new RangeReply(keys);
+    }
+}

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java?rev=769874&r1=769873&r2=769874&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java Wed Apr 29 18:57:35 2009
@@ -23,8 +23,13 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.collections.Predicate;
+
 import org.apache.cassandra.analytics.DBAnalyticsSource;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.BootstrapInitiateMessage;
@@ -61,7 +66,7 @@
      * is basically the column family name and the ID associated with
      * this column family. We use this ID in the Commit Log header to
      * determine when a log file that has been rolled can be deleted.
-    */    
+    */
     public static class TableMetadata
     {
         /* Name of the column family */
@@ -387,8 +392,10 @@
     /* ColumnFamilyStore per column family */
     private Map<String, ColumnFamilyStore> columnFamilyStores_ = new HashMap<String, ColumnFamilyStore>();
     /* The AnalyticsSource instance which keeps track of statistics reported to Ganglia. */
-    private DBAnalyticsSource dbAnalyticsSource_;    
-    
+    private DBAnalyticsSource dbAnalyticsSource_;
+    // cache application CFs since Range queries ask for them a _lot_
+    private Set<String> applicationColumnFamilies_;
+
     public static Table open(String table)
     {
         Table tableInstance = instances_.get(table);
@@ -840,4 +847,100 @@
         long timeTaken = System.currentTimeMillis() - start;
         dbAnalyticsSource_.updateWriteStatistics(timeTaken);
     }
+
+    public Set<String> getApplicationColumnFamilies()
+    {
+        if (applicationColumnFamilies_ == null)
+        {
+            applicationColumnFamilies_ = new HashSet<String>();
+            for (String cfName : getColumnFamilies())
+            {
+                if (DatabaseDescriptor.isApplicationColumnFamily(cfName))
+                {
+                    applicationColumnFamilies_.add(cfName);
+                }
+            }
+        }
+        return applicationColumnFamilies_;
+    }
+
+    /**
+     * @param startWith key to start with, inclusive.  empty string = start at beginning.
+     * @param stopAt key to stop at, inclusive.  empty string = stop only when keys are exhausted.
+     * @param maxResults
+     * @return list of keys between startWith and stopAt
+     */
+    public List<String> getKeyRange(final String startWith, final String stopAt, int maxResults) throws IOException
+    {
+        // (OPP key decoration is a no-op so using the "decorated" comparator against raw keys is fine)
+        final Comparator<String> comparator = StorageService.getPartitioner().getDecoratedKeyComparator();
+
+        // create a CollatedIterator that will return unique keys from different sources
+        // (current memtable, historical memtables, and SSTables) in the correct order.
+        List<Iterator<String>> iterators = new ArrayList<Iterator<String>>();
+        for (String cfName : getApplicationColumnFamilies())
+        {
+            ColumnFamilyStore cfs = getColumnFamilyStore(cfName);
+
+            // memtable keys: current and historical
+            Iterator<Memtable> memtables = (Iterator<Memtable>) IteratorUtils.chainedIterator(
+                    IteratorUtils.singletonIterator(cfs.getMemtable()),
+                    MemtableManager.instance().getUnflushedMemtables(cfName).iterator());
+            while (memtables.hasNext())
+            {
+                iterators.add(IteratorUtils.filteredIterator(memtables.next().sortedKeyIterator(), new Predicate()
+                {
+                    public boolean evaluate(Object key)
+                    {
+                        String st = (String)key;
+                        return comparator.compare(startWith, st) <= 0 && (stopAt.isEmpty() || comparator.compare(st, stopAt) <= 0);
+                    }
+                }));
+            }
+
+            // sstables
+            for (String filename : cfs.getSSTableFilenames())
+            {
+                FileStruct fs = new FileStruct(SequenceFile.reader(filename), StorageService.getPartitioner());
+                fs.seekTo(startWith);
+                iterators.add(fs);
+            }
+        }
+        Iterator<String> iter = IteratorUtils.collatedIterator(comparator, iterators);
+
+        // pull keys out of the CollatedIterator.  checking tombstone status is expensive,
+        // so we set an arbitrary limit on how many we'll do at once.
+        List<String> keys = new ArrayList<String>();
+        String last = null, current = null;
+        while (keys.size() < maxResults)
+        {
+            if (!iter.hasNext())
+            {
+                break;
+            }
+            current = iter.next();
+            if (!current.equals(last))
+            {
+                if (!stopAt.isEmpty() && comparator.compare(stopAt, current) < 0)
+                {
+                    break;
+                }
+                last = current;
+                // make sure there is actually non-tombstone content associated w/ this key
+                // TODO record the key source(s) somehow and only check that source (e.g., memtable or sstable)
+                for (String cfName : getApplicationColumnFamilies())
+                {
+                    ColumnFamilyStore cfs = getColumnFamilyStore(cfName);
+                    ColumnFamily cf = cfs.getColumnFamily(current, cfName, new IdentityFilter(), Integer.MAX_VALUE);
+                    if (cf != null && cf.getColumns().size() > 0)
+                    {
+                        keys.add(current);
+                        break;
+                    }
+                }
+            }
+        }
+
+        return keys;
+    }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/DataInputBuffer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/DataInputBuffer.java?rev=769874&r1=769873&r2=769874&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/DataInputBuffer.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/DataInputBuffer.java Wed Apr 29 18:57:35 2009
@@ -96,4 +96,9 @@
     {
         return buffer_.getLength();
     }
+
+    public int getPosition()
+    {
+        return buffer_.getPosition();
+    }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java?rev=769874&r1=769873&r2=769874&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java Wed Apr 29 18:57:35 2009
@@ -596,8 +596,6 @@
             /* we do this because relative position of the key within a block is stored. */
             if (position != -1L)
                 position = blockIndexPosition - position;
-            else
-                throw new IOException("This key " + key + " does not exist in this file.");
             return position;
         }
 
@@ -712,6 +710,8 @@
             /* Goto the Block Index */
             seek(section.end_);
             long position = getPositionFromBlockIndex(key);
+            if (position == -1)
+                throw new IOException("This key " + key + " does not exist in this file.");
             seek(position);
         }
 

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java?rev=769874&r1=769873&r2=769874&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java Wed Apr 29 18:57:35 2009
@@ -28,6 +28,7 @@
 import java.util.Map;
 import java.util.Arrays;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
 
@@ -47,7 +48,15 @@
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.ColumnFamilyNotDefinedException;
 import org.apache.cassandra.db.TableNotDefinedException;
+import org.apache.cassandra.db.RangeCommand;
+import org.apache.cassandra.db.RangeReply;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.IAsyncResult;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.dht.OrderPreservingPartitioner;
 import org.apache.thrift.TException;
 
 /**
@@ -546,6 +555,33 @@
         return result;
     }
 
+    public List<String> get_key_range(String tablename, String startWith, String stopAt, int maxResults) throws InvalidRequestException
+    {
+        logger_.debug("get_range");
+
+        if (!(StorageService.getPartitioner() instanceof OrderPreservingPartitioner))
+        {
+            throw new InvalidRequestException("range queries may only be performed against an order-preserving partitioner");
+        }
+
+        try
+        {
+            Message message = new RangeCommand(tablename, startWith, stopAt, maxResults).getMessage();
+            EndPoint endPoint = StorageService.instance().findSuitableEndPoint(startWith);
+            IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
+
+            // read response
+            // TODO send more requests if we need to span multiple nodes
+            // double the usual timeout since range requests are expensive
+            byte[] responseBody = (byte[])iar.get(2 * DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS)[0];
+            return RangeReply.read(responseBody).keys;
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
     /*
      * This method is used to ensure that all keys
      * prior to the specified key, as dtermined by

Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/RangeVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/RangeVerbHandler.java?rev=769874&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/RangeVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/RangeVerbHandler.java Wed Apr 29 18:57:35 2009
@@ -0,0 +1,33 @@
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.RangeCommand;
+import org.apache.cassandra.db.RangeReply;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+
+public class RangeVerbHandler implements IVerbHandler
+{
+    public void doVerb(Message message)
+    {
+        List<String> keys;
+        try
+        {
+            RangeCommand command = RangeCommand.read(message);
+            Table table = Table.open(command.table);
+            keys = table.getKeyRange(command.startWith, command.stopAt, command.maxResults);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        Message response = new RangeReply(keys).getReply(message);
+        MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
+    }
+}

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java?rev=769874&r1=769873&r2=769874&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java Wed Apr 29 18:57:35 2009
@@ -21,6 +21,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
+import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -138,6 +139,7 @@
     public final static String bsMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
     public final static String calloutDeployVerbHandler_ = "CALLOUT-DEPLOY-VERB-HANDLER";
     public final static String touchVerbHandler_ = "TOUCH-VERB-HANDLER";
+    public static String rangeVerbHandler_ = "RANGE-VERB-HANDLER";
 
     public static enum ConsistencyLevel
     {
@@ -303,6 +305,7 @@
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bsMetadataVerbHandler_, new BootstrapMetadataVerbHandler() );        
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.calloutDeployVerbHandler_, new CalloutDeployVerbHandler() );
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.touchVerbHandler_, new TouchVerbHandler());
+        MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.rangeVerbHandler_, new RangeVerbHandler());
         
         /* register the stage for the mutations */
         int threadCount = DatabaseDescriptor.getThreadsPerPool();
@@ -415,14 +418,14 @@
 
     static
     {
-        String hashingStrategy = DatabaseDescriptor.getHashingStrategy();
-        if (DatabaseDescriptor.ophf_.equalsIgnoreCase(hashingStrategy))
+        try
         {
-            partitioner_ = new OrderPreservingPartitioner();
-        }        
-        else
+            Class cls = Class.forName(DatabaseDescriptor.getPartitionerClass());
+            partitioner_ = (IPartitioner) cls.getConstructor().newInstance();
+        }
+        catch (Exception e)
         {
-            partitioner_ = new RandomPartitioner();
+            throw new RuntimeException(e);
         }
     }
     

Added: incubator/cassandra/trunk/src/org/apache/cassandra/utils/DestructivePQIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/utils/DestructivePQIterator.java?rev=769874&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/utils/DestructivePQIterator.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/utils/DestructivePQIterator.java Wed Apr 29 18:57:35 2009
@@ -0,0 +1,25 @@
+package org.apache.cassandra.utils;
+
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+public class DestructivePQIterator<T> implements Iterator<T> {
+    private PriorityQueue<T> pq;
+
+    public DestructivePQIterator(PriorityQueue<T> pq) {
+        this.pq = pq;
+    }
+
+    public boolean hasNext() {
+        return pq.size() > 0;
+    }
+
+    public T next() {
+        return pq.poll();
+    }
+
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+}
+

Modified: incubator/cassandra/trunk/test/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/conf/storage-conf.xml?rev=769874&r1=769873&r2=769874&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/test/conf/storage-conf.xml Wed Apr 29 18:57:35 2009
@@ -1,12 +1,6 @@
 <Storage>
    <ClusterName>Test Cluster</ClusterName>
-   <!-- any IPartitioner may be used, including your own
-        as long as it is on the classpath.  Out of the box,
-        Cassandra provides
-        org.apache.cassandra.dht.RandomPartitioner and
-        org.apache.cassandra.dht.OrderPreservingPartitioner.
-        Range queries require using OrderPreservingPartitioner or a subclass. -->
-   <Partitioner>org.apache.cassandra.dht.RandomPartitioner</Partitioner>
+   <Partitioner>org.apache.cassandra.dht.OrderPreservingPartitioner</Partitioner>
    <RackAware>false</RackAware>
    <MulticastChannel>230.0.0.1</MulticastChannel>
    <ReplicationFactor>1</ReplicationFactor>

Modified: incubator/cassandra/trunk/test/system/test_server.py
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/system/test_server.py?rev=769874&r1=769873&r2=769874&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/system/test_server.py (original)
+++ incubator/cassandra/trunk/test/system/test_server.py Wed Apr 29 18:57:35 2009
@@ -221,3 +221,38 @@
                            columns=[column_t(columnName='c4', value='value4', timestamp=0)]),
              superColumn_t(name='sc2', 
                            columns=[column_t(columnName='c5', value='value5', timestamp=6)])], actual
+
+
+    def test_empty_range(self):
+        assert client.get_key_range('Table1', '', '', 1000) == []
+
+    def test_range_with_remove(self):
+        _insert_simple()
+        assert client.get_key_range('Table1', 'key1', '', 1000) == ['key1']
+
+        client.remove('Table1', 'key1', 'Standard1:c1', 1, True)
+        client.remove('Table1', 'key1', 'Standard1:c2', 1, True)
+        assert client.get_key_range('Table1', '', '', 1000) == []
+
+    def test_range_collation(self):
+        for key in ['-a', '-b', 'a', 'b'] + [str(i) for i in xrange(100)]:
+            client.insert_blocking('Table1', key, 'Standard1:' + key, 'v', 0)
+        L = client.get_key_range('Table1', '', '', 1000)
+        # note the collated ordering rather than ascii
+        assert L == ['0', '1', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '2', '20', '21', '22', '23', '24', '25', '26', '27','28', '29', '3', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '4', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '5', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '6', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '7', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '8', '80', '81', '82', '83', '84', '85', '86', '87', '88', '89', '9', '90', '91', '92', '93', '94', '95', '96', '97', '98', '99', 'a', '-a', 'b', '-b'], L
+
+    def test_range_partial(self):
+        for key in ['-a', '-b', 'a', 'b'] + [str(i) for i in xrange(100)]:
+            client.insert_blocking('Table1', key, 'Standard1:' + key, 'v', 0)
+
+        L = client.get_key_range('Table1', 'a', '', 1000)
+        assert L == ['a', '-a', 'b', '-b'], L
+
+        L = client.get_key_range('Table1', '', '15', 1000)
+        assert L == ['0', '1', '10', '11', '12', '13', '14', '15'], L
+
+        L = client.get_key_range('Table1', '50', '51', 1000)
+        assert L == ['50', '51'], L
+    
+        L = client.get_key_range('Table1', '1', '', 10)
+        assert L == ['1', '10', '11', '12', '13', '14', '15', '16', '17', '18'], L