You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/08/18 18:40:43 UTC

svn commit: r986781 - in /cassandra/trunk: ./ interface/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/thrift/

Author: jbellis
Date: Wed Aug 18 16:40:42 2010
New Revision: 986781

URL: http://svn.apache.org/viewvc?rev=986781&view=rev
Log:
per-connection read-your-writes "session" consistency.  patch by Brian Palmer; reviewed by jbellis for CASSANDRA-876

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/NEWS.txt
    cassandra/trunk/interface/cassandra.thrift
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Aug 18 16:40:42 2010
@@ -16,6 +16,7 @@ dev
  * add support for GT/E, LT/E in subordinate index clauses (CASSANDRA-1401)
  * cfId counter got out of sync when CFs were added (CASSANDRA-1403)
  * less chatty schema updates (CASSANDRA-1389)
+ * add optional per-connection read-your-writes consistency (CASSANDRA-876)
 
 
 0.7-beta1

Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Wed Aug 18 16:40:42 2010
@@ -17,6 +17,7 @@ Features
     - `truncate` thrift method allows clearing an entire ColumnFamily at once
     - Hadoop OutputFormat support
     - Up to 8x faster reads from row cache
+    - Optional per-connection read-your-writes consistency
     - A new ByteOrderedPartitioner supports bytes keys with arbitrary content,
       and orders keys by their byte value.  This should be used in new
       deployments instead of OrderPreservingPartitioner.

Modified: cassandra/trunk/interface/cassandra.thrift
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.thrift?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.thrift (original)
+++ cassandra/trunk/interface/cassandra.thrift Wed Aug 18 16:40:42 2010
@@ -385,6 +385,13 @@ service Cassandra {
   # set keyspace
   void set_keyspace(1: required string keyspace) throws (1:InvalidRequestException ire),
   
+  # enable session consistency
+  # call again to start a new session
+  void enable_session_consistency(),
+
+  # disable a previously enabled session
+  void disable_session_consistency(),
+
   # retrieval methods
 
   /**

Modified: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (original)
+++ cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java Wed Aug 18 16:40:42 2010
@@ -54,6 +54,10 @@ public class Cassandra {
 
     public void set_keyspace(String keyspace) throws InvalidRequestException, TException;
 
+    public void enable_session_consistency() throws TException;
+
+    public void disable_session_consistency() throws TException;
+
     /**
      * Get the Column or SuperColumn at the given column_path. If no value is present, NotFoundException is thrown. (This is
      * the only method that can throw an exception under non-failure conditions.)
@@ -287,6 +291,10 @@ public class Cassandra {
 
     public void set_keyspace(String keyspace, AsyncMethodCallback<AsyncClient.set_keyspace_call> resultHandler) throws TException;
 
+    public void enable_session_consistency(AsyncMethodCallback<AsyncClient.enable_session_consistency_call> resultHandler) throws TException;
+
+    public void disable_session_consistency(AsyncMethodCallback<AsyncClient.disable_session_consistency_call> resultHandler) throws TException;
+
     public void get(byte[] key, ColumnPath column_path, ConsistencyLevel consistency_level, AsyncMethodCallback<AsyncClient.get_call> resultHandler) throws TException;
 
     public void get_slice(byte[] key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level, AsyncMethodCallback<AsyncClient.get_slice_call> resultHandler) throws TException;
@@ -454,6 +462,70 @@ public class Cassandra {
       return;
     }
 
+    public void enable_session_consistency() throws TException
+    {
+      send_enable_session_consistency();
+      recv_enable_session_consistency();
+    }
+
+    public void send_enable_session_consistency() throws TException
+    {
+      oprot_.writeMessageBegin(new TMessage("enable_session_consistency", TMessageType.CALL, ++seqid_));
+      enable_session_consistency_args args = new enable_session_consistency_args();
+      args.write(oprot_);
+      oprot_.writeMessageEnd();
+      oprot_.getTransport().flush();
+    }
+
+    public void recv_enable_session_consistency() throws TException
+    {
+      TMessage msg = iprot_.readMessageBegin();
+      if (msg.type == TMessageType.EXCEPTION) {
+        TApplicationException x = TApplicationException.read(iprot_);
+        iprot_.readMessageEnd();
+        throw x;
+      }
+      if (msg.seqid != seqid_) {
+        throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, "enable_session_consistency failed: out of sequence response");
+      }
+      enable_session_consistency_result result = new enable_session_consistency_result();
+      result.read(iprot_);
+      iprot_.readMessageEnd();
+      return;
+    }
+
+    public void disable_session_consistency() throws TException
+    {
+      send_disable_session_consistency();
+      recv_disable_session_consistency();
+    }
+
+    public void send_disable_session_consistency() throws TException
+    {
+      oprot_.writeMessageBegin(new TMessage("disable_session_consistency", TMessageType.CALL, ++seqid_));
+      disable_session_consistency_args args = new disable_session_consistency_args();
+      args.write(oprot_);
+      oprot_.writeMessageEnd();
+      oprot_.getTransport().flush();
+    }
+
+    public void recv_disable_session_consistency() throws TException
+    {
+      TMessage msg = iprot_.readMessageBegin();
+      if (msg.type == TMessageType.EXCEPTION) {
+        TApplicationException x = TApplicationException.read(iprot_);
+        iprot_.readMessageEnd();
+        throw x;
+      }
+      if (msg.seqid != seqid_) {
+        throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, "disable_session_consistency failed: out of sequence response");
+      }
+      disable_session_consistency_result result = new disable_session_consistency_result();
+      result.read(iprot_);
+      iprot_.readMessageEnd();
+      return;
+    }
+
     public ColumnOrSuperColumn get(byte[] key, ColumnPath column_path, ConsistencyLevel consistency_level) throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException
     {
       send_get(key, column_path, consistency_level);
@@ -1577,6 +1649,62 @@ public class Cassandra {
       }
     }
 
+    public void enable_session_consistency(AsyncMethodCallback<enable_session_consistency_call> resultHandler) throws TException {
+      checkReady();
+      enable_session_consistency_call method_call = new enable_session_consistency_call(resultHandler, this, protocolFactory, transport);
+      manager.call(method_call);
+    }
+
+    public static class enable_session_consistency_call extends TAsyncMethodCall {
+      public enable_session_consistency_call(AsyncMethodCallback<enable_session_consistency_call> resultHandler, TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport) throws TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+      }
+
+      public void write_args(TProtocol prot) throws TException {
+        prot.writeMessageBegin(new TMessage("enable_session_consistency", TMessageType.CALL, 0));
+        enable_session_consistency_args args = new enable_session_consistency_args();
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public void getResult() throws TException {
+        if (getState() != State.RESPONSE_READ) {
+          throw new IllegalStateException("Method call not finished!");
+        }
+        TMemoryInputTransport memoryTransport = new TMemoryInputTransport(getFrameBuffer().array());
+        TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+        (new Client(prot)).recv_enable_session_consistency();
+      }
+    }
+
+    public void disable_session_consistency(AsyncMethodCallback<disable_session_consistency_call> resultHandler) throws TException {
+      checkReady();
+      disable_session_consistency_call method_call = new disable_session_consistency_call(resultHandler, this, protocolFactory, transport);
+      manager.call(method_call);
+    }
+
+    public static class disable_session_consistency_call extends TAsyncMethodCall {
+      public disable_session_consistency_call(AsyncMethodCallback<disable_session_consistency_call> resultHandler, TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport) throws TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+      }
+
+      public void write_args(TProtocol prot) throws TException {
+        prot.writeMessageBegin(new TMessage("disable_session_consistency", TMessageType.CALL, 0));
+        disable_session_consistency_args args = new disable_session_consistency_args();
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public void getResult() throws TException {
+        if (getState() != State.RESPONSE_READ) {
+          throw new IllegalStateException("Method call not finished!");
+        }
+        TMemoryInputTransport memoryTransport = new TMemoryInputTransport(getFrameBuffer().array());
+        TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+        (new Client(prot)).recv_disable_session_consistency();
+      }
+    }
+
     public void get(byte[] key, ColumnPath column_path, ConsistencyLevel consistency_level, AsyncMethodCallback<get_call> resultHandler) throws TException {
       checkReady();
       get_call method_call = new get_call(key, column_path, consistency_level, resultHandler, this, protocolFactory, transport);
@@ -2448,6 +2576,8 @@ public class Cassandra {
       iface_ = iface;
       processMap_.put("login", new login());
       processMap_.put("set_keyspace", new set_keyspace());
+      processMap_.put("enable_session_consistency", new enable_session_consistency());
+      processMap_.put("disable_session_consistency", new disable_session_consistency());
       processMap_.put("get", new get());
       processMap_.put("get_slice", new get_slice());
       processMap_.put("get_count", new get_count());
@@ -2578,6 +2708,58 @@ public class Cassandra {
 
     }
 
+    private class enable_session_consistency implements ProcessFunction {
+      public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException
+      {
+        enable_session_consistency_args args = new enable_session_consistency_args();
+        try {
+          args.read(iprot);
+        } catch (TProtocolException e) {
+          iprot.readMessageEnd();
+          TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
+          oprot.writeMessageBegin(new TMessage("enable_session_consistency", TMessageType.EXCEPTION, seqid));
+          x.write(oprot);
+          oprot.writeMessageEnd();
+          oprot.getTransport().flush();
+          return;
+        }
+        iprot.readMessageEnd();
+        enable_session_consistency_result result = new enable_session_consistency_result();
+        iface_.enable_session_consistency();
+        oprot.writeMessageBegin(new TMessage("enable_session_consistency", TMessageType.REPLY, seqid));
+        result.write(oprot);
+        oprot.writeMessageEnd();
+        oprot.getTransport().flush();
+      }
+
+    }
+
+    private class disable_session_consistency implements ProcessFunction {
+      public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException
+      {
+        disable_session_consistency_args args = new disable_session_consistency_args();
+        try {
+          args.read(iprot);
+        } catch (TProtocolException e) {
+          iprot.readMessageEnd();
+          TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
+          oprot.writeMessageBegin(new TMessage("disable_session_consistency", TMessageType.EXCEPTION, seqid));
+          x.write(oprot);
+          oprot.writeMessageEnd();
+          oprot.getTransport().flush();
+          return;
+        }
+        iprot.readMessageEnd();
+        disable_session_consistency_result result = new disable_session_consistency_result();
+        iface_.disable_session_consistency();
+        oprot.writeMessageBegin(new TMessage("disable_session_consistency", TMessageType.REPLY, seqid));
+        result.write(oprot);
+        oprot.writeMessageEnd();
+        oprot.getTransport().flush();
+      }
+
+    }
+
     private class get implements ProcessFunction {
       public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException
       {
@@ -4847,6 +5029,768 @@ public class Cassandra {
 
   }
 
+  public static class enable_session_consistency_args implements TBase<enable_session_consistency_args, enable_session_consistency_args._Fields>, java.io.Serializable, Cloneable   {
+    private static final TStruct STRUCT_DESC = new TStruct("enable_session_consistency_args");
+
+
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements TFieldIdEnum {
+;
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+    public static final Map<_Fields, FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, FieldMetaData> tmpMap = new EnumMap<_Fields, FieldMetaData>(_Fields.class);
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      FieldMetaData.addStructMetaDataMap(enable_session_consistency_args.class, metaDataMap);
+    }
+
+    public enable_session_consistency_args() {
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public enable_session_consistency_args(enable_session_consistency_args other) {
+    }
+
+    public enable_session_consistency_args deepCopy() {
+      return new enable_session_consistency_args(this);
+    }
+
+    @Deprecated
+    public enable_session_consistency_args clone() {
+      return new enable_session_consistency_args(this);
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      }
+    }
+
+    public void setFieldValue(int fieldID, Object value) {
+      setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value);
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    public Object getFieldValue(int fieldId) {
+      return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId));
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    public boolean isSet(int fieldID) {
+      return isSet(_Fields.findByThriftIdOrThrow(fieldID));
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof enable_session_consistency_args)
+        return this.equals((enable_session_consistency_args)that);
+      return false;
+    }
+
+    public boolean equals(enable_session_consistency_args that) {
+      if (that == null)
+        return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public int compareTo(enable_session_consistency_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+      enable_session_consistency_args typedOther = (enable_session_consistency_args)other;
+
+      return 0;
+    }
+
+    public void read(TProtocol iprot) throws TException {
+      TField field;
+      iprot.readStructBegin();
+      while (true)
+      {
+        field = iprot.readFieldBegin();
+        if (field.type == TType.STOP) { 
+          break;
+        }
+        switch (field.id) {
+          default:
+            TProtocolUtil.skip(iprot, field.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      validate();
+    }
+
+    public void write(TProtocol oprot) throws TException {
+      validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("enable_session_consistency_args(");
+      boolean first = true;
+
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws TException {
+      // check for required fields
+    }
+
+  }
+
+  public static class enable_session_consistency_result implements TBase<enable_session_consistency_result, enable_session_consistency_result._Fields>, java.io.Serializable, Cloneable   {
+    private static final TStruct STRUCT_DESC = new TStruct("enable_session_consistency_result");
+
+
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements TFieldIdEnum {
+;
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+    public static final Map<_Fields, FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, FieldMetaData> tmpMap = new EnumMap<_Fields, FieldMetaData>(_Fields.class);
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      FieldMetaData.addStructMetaDataMap(enable_session_consistency_result.class, metaDataMap);
+    }
+
+    public enable_session_consistency_result() {
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public enable_session_consistency_result(enable_session_consistency_result other) {
+    }
+
+    public enable_session_consistency_result deepCopy() {
+      return new enable_session_consistency_result(this);
+    }
+
+    @Deprecated
+    public enable_session_consistency_result clone() {
+      return new enable_session_consistency_result(this);
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      }
+    }
+
+    public void setFieldValue(int fieldID, Object value) {
+      setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value);
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    public Object getFieldValue(int fieldId) {
+      return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId));
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    public boolean isSet(int fieldID) {
+      return isSet(_Fields.findByThriftIdOrThrow(fieldID));
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof enable_session_consistency_result)
+        return this.equals((enable_session_consistency_result)that);
+      return false;
+    }
+
+    public boolean equals(enable_session_consistency_result that) {
+      if (that == null)
+        return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public int compareTo(enable_session_consistency_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+      enable_session_consistency_result typedOther = (enable_session_consistency_result)other;
+
+      return 0;
+    }
+
+    public void read(TProtocol iprot) throws TException {
+      TField field;
+      iprot.readStructBegin();
+      while (true)
+      {
+        field = iprot.readFieldBegin();
+        if (field.type == TType.STOP) { 
+          break;
+        }
+        switch (field.id) {
+          default:
+            TProtocolUtil.skip(iprot, field.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      validate();
+    }
+
+    public void write(TProtocol oprot) throws TException {
+      oprot.writeStructBegin(STRUCT_DESC);
+
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("enable_session_consistency_result(");
+      boolean first = true;
+
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws TException {
+      // check for required fields
+    }
+
+  }
+
+  public static class disable_session_consistency_args implements TBase<disable_session_consistency_args, disable_session_consistency_args._Fields>, java.io.Serializable, Cloneable   {
+    private static final TStruct STRUCT_DESC = new TStruct("disable_session_consistency_args");
+
+
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements TFieldIdEnum {
+;
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+    public static final Map<_Fields, FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, FieldMetaData> tmpMap = new EnumMap<_Fields, FieldMetaData>(_Fields.class);
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      FieldMetaData.addStructMetaDataMap(disable_session_consistency_args.class, metaDataMap);
+    }
+
+    public disable_session_consistency_args() {
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public disable_session_consistency_args(disable_session_consistency_args other) {
+    }
+
+    public disable_session_consistency_args deepCopy() {
+      return new disable_session_consistency_args(this);
+    }
+
+    @Deprecated
+    public disable_session_consistency_args clone() {
+      return new disable_session_consistency_args(this);
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      }
+    }
+
+    public void setFieldValue(int fieldID, Object value) {
+      setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value);
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    public Object getFieldValue(int fieldId) {
+      return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId));
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    public boolean isSet(int fieldID) {
+      return isSet(_Fields.findByThriftIdOrThrow(fieldID));
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof disable_session_consistency_args)
+        return this.equals((disable_session_consistency_args)that);
+      return false;
+    }
+
+    public boolean equals(disable_session_consistency_args that) {
+      if (that == null)
+        return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public int compareTo(disable_session_consistency_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+      disable_session_consistency_args typedOther = (disable_session_consistency_args)other;
+
+      return 0;
+    }
+
+    public void read(TProtocol iprot) throws TException {
+      TField field;
+      iprot.readStructBegin();
+      while (true)
+      {
+        field = iprot.readFieldBegin();
+        if (field.type == TType.STOP) { 
+          break;
+        }
+        switch (field.id) {
+          default:
+            TProtocolUtil.skip(iprot, field.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      validate();
+    }
+
+    public void write(TProtocol oprot) throws TException {
+      validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("disable_session_consistency_args(");
+      boolean first = true;
+
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws TException {
+      // check for required fields
+    }
+
+  }
+
+  public static class disable_session_consistency_result implements TBase<disable_session_consistency_result, disable_session_consistency_result._Fields>, java.io.Serializable, Cloneable   {
+    private static final TStruct STRUCT_DESC = new TStruct("disable_session_consistency_result");
+
+
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements TFieldIdEnum {
+;
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+    public static final Map<_Fields, FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, FieldMetaData> tmpMap = new EnumMap<_Fields, FieldMetaData>(_Fields.class);
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      FieldMetaData.addStructMetaDataMap(disable_session_consistency_result.class, metaDataMap);
+    }
+
+    public disable_session_consistency_result() {
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public disable_session_consistency_result(disable_session_consistency_result other) {
+    }
+
+    public disable_session_consistency_result deepCopy() {
+      return new disable_session_consistency_result(this);
+    }
+
+    @Deprecated
+    public disable_session_consistency_result clone() {
+      return new disable_session_consistency_result(this);
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      }
+    }
+
+    public void setFieldValue(int fieldID, Object value) {
+      setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value);
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    public Object getFieldValue(int fieldId) {
+      return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId));
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    public boolean isSet(int fieldID) {
+      return isSet(_Fields.findByThriftIdOrThrow(fieldID));
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof disable_session_consistency_result)
+        return this.equals((disable_session_consistency_result)that);
+      return false;
+    }
+
+    public boolean equals(disable_session_consistency_result that) {
+      if (that == null)
+        return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public int compareTo(disable_session_consistency_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+      disable_session_consistency_result typedOther = (disable_session_consistency_result)other;
+
+      return 0;
+    }
+
+    public void read(TProtocol iprot) throws TException {
+      TField field;
+      iprot.readStructBegin();
+      while (true)
+      {
+        field = iprot.readFieldBegin();
+        if (field.type == TType.STOP) { 
+          break;
+        }
+        switch (field.id) {
+          default:
+            TProtocolUtil.skip(iprot, field.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      validate();
+    }
+
+    public void write(TProtocol oprot) throws TException {
+      oprot.writeStructBegin(STRUCT_DESC);
+
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("disable_session_consistency_result(");
+      boolean first = true;
+
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws TException {
+      // check for required fields
+    }
+
+  }
+
   public static class get_args implements TBase<get_args, get_args._Fields>, java.io.Serializable, Cloneable   {
     private static final TStruct STRUCT_DESC = new TStruct("get_args");
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed Aug 18 16:40:42 2010
@@ -108,12 +108,7 @@ public class Memtable implements Compara
         isFrozen = true;
     }
 
-    /**
-     * Should only be called by ColumnFamilyStore.apply.  NOT a public API.
-     * (CFS handles locking to avoid submitting an op
-     *  to a flushing memtable.  Any other way is unsafe.)
-    */
-    void put(DecoratedKey key, ColumnFamily columnFamily)
+    public void put(DecoratedKey key, ColumnFamily columnFamily)
     {
         assert !isFrozen; // not 100% foolproof but hell, it's an assert
         resolve(key, columnFamily);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Wed Aug 18 16:40:42 2010
@@ -25,10 +25,12 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.concurrent.StageManager;
@@ -86,12 +88,41 @@ public abstract class ReadCommand
     
     public abstract ReadCommand copy();
 
-    public abstract Row getRow(Table table) throws IOException;
+    public Row getRow(Table table) throws IOException
+    {
+        return table.getRow(getQueryFilter());
+    }
+
+    protected abstract QueryFilter getQueryFilter();
 
-    protected AbstractType getComparator()
+	protected AbstractType getComparator()
     {
         return ColumnFamily.getComparatorFor(table, getColumnFamilyName(), queryPath.superColumnName);
     }
+
+	public Row mergeRowWithMemtables(Table table, Row row, Map<ColumnFamilyStore, Memtable> map)
+	{
+        QueryFilter filter = getQueryFilter();
+        ColumnFamilyStore cfStore = table.getColumnFamilyStore(filter.getColumnFamilyName());
+        Memtable memtable = map.get(cfStore);
+
+        final ColumnFamily memtableCf = ColumnFamily.create(cfStore.metadata);
+        IColumnIterator iter = filter.getMemtableColumnIterator(memtable, cfStore.getComparator());
+        filter.collectCollatedColumns(memtableCf, iter, (int) (System.currentTimeMillis() / 1000) - cfStore.metadata.gcGraceSeconds);
+        try
+        {
+            iter.close();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        if (row.cf == null)
+            return new Row(filter.key, memtableCf);
+        row.cf.resolve(memtableCf);
+        return new Row(filter.key, row.cf);
+    }
 }
 
 class ReadCommandSerializer implements ICompactSerializer<ReadCommand>

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java Wed Aug 18 16:40:42 2010
@@ -53,13 +53,6 @@ public class SliceByNamesReadCommand ext
     }
     
     @Override
-    public Row getRow(Table table) throws IOException
-    {
-        DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
-        return table.getRow(QueryFilter.getNamesFilter(dk, queryPath, columnNames));
-    }
-
-    @Override
     public String toString()
     {
         return "SliceByNamesReadCommand(" +
@@ -70,6 +63,11 @@ public class SliceByNamesReadCommand ext
                ')';
     }
 
+    protected QueryFilter getQueryFilter()
+    {
+        DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
+        return QueryFilter.getNamesFilter(dk, queryPath, columnNames);
+    }
 }
 
 class SliceByNamesReadCommandSerializer extends ReadCommandSerializer

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java Wed Aug 18 16:40:42 2010
@@ -65,10 +65,10 @@ public class SliceFromReadCommand extend
     }
 
     @Override
-    public Row getRow(Table table) throws IOException
+    protected QueryFilter getQueryFilter()
     {
         DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
-        return table.getRow(QueryFilter.getSliceFilter(dk, queryPath, start, finish, bitmasks, reversed, count));
+        return QueryFilter.getSliceFilter(dk, queryPath, start, finish, bitmasks, reversed, count);
     }
 
     @Override

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Aug 18 16:40:42 2010
@@ -31,6 +31,7 @@ import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.dht.LocalToken;
 import org.apache.cassandra.io.sstable.SSTableDeletingReference;
@@ -469,6 +470,28 @@ public class Table
             memtablesToFlush.put(cfs, memtableToFlush);
     }
 
+	public void applyToMemtable(RowMutation rm, Map<ColumnFamilyStore, Memtable> memtables)
+	{
+		for (ColumnFamily columnFamily : rm.getColumnFamilies())
+		{
+			ColumnFamilyStore cfs = columnFamilyStores.get(columnFamily.id());
+			if (cfs == null)
+			{
+				logger.error("Attempting to mutate non-existant column family " + columnFamily.id());
+				continue;
+			}
+			
+			DecoratedKey key = StorageService.getPartitioner().decorateKey(rm.key());
+			Memtable memtable = memtables.get(cfs);
+			if (memtable == null)
+			{
+				memtable = new Memtable(cfs, null);
+				memtables.put(cfs, memtable);
+			}
+            memtable.put(key, columnFamily);
+        }
+	}
+
     public List<Future<?>> flush() throws IOException
     {
         List<Future<?>> futures = new ArrayList<Future<?>>();

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Aug 18 16:40:42 2010
@@ -28,7 +28,6 @@ import java.util.concurrent.*;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Multimap;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
@@ -51,6 +50,7 @@ import org.apache.cassandra.net.Messagin
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.LatencyTracker;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.cassandra.db.filter.QueryFilter;
 
@@ -64,6 +64,8 @@ public class StorageProxy implements Sto
     private static final LatencyTracker rangeStats = new LatencyTracker();
     private static final LatencyTracker writeStats = new LatencyTracker();
 
+    private static ThreadLocal<Map<ColumnFamilyStore, Memtable>> sessionWrites = new ThreadLocal<Map<ColumnFamilyStore,Memtable>>();
+
     private StorageProxy() {}
     static
     {
@@ -94,7 +96,7 @@ public class StorageProxy implements Sto
 
         RowMutation mostRecentRowMutation = null;
         StorageService ss = StorageService.instance;
-        
+
         try
         {
             for (RowMutation rm : mutations)
@@ -161,6 +163,14 @@ public class StorageProxy implements Sto
             {
                 responseHandler.get();
             }
+            if (sessionWrites.get() != null)
+            {
+                for (RowMutation rm : mutations)
+                {
+                    // no need to apply locally-written mutations to the session
+                    Table.open(rm.getTable()).applyToMemtable(rm, sessionWrites.get());
+                }
+            }
         }
         catch (IOException e)
         {
@@ -230,7 +240,7 @@ public class StorageProxy implements Sto
 
         // send off all the commands asynchronously
         List<Future<Object>> localFutures = null;
-        List<IAsyncResult> remoteResults = null;
+        List<Pair<IAsyncResult,ReadCommand>> remoteResults = null;
         for (ReadCommand command: commands)
         {
             InetAddress endPoint = StorageService.instance.findSuitableEndpoint(command.table, command.key);
@@ -247,13 +257,13 @@ public class StorageProxy implements Sto
             else
             {
                 if (remoteResults == null)
-                    remoteResults = new ArrayList<IAsyncResult>();
+                    remoteResults = new ArrayList<Pair<IAsyncResult, ReadCommand>>();
                 Message message = command.makeReadMessage();
                 if (logger.isDebugEnabled())
                     logger.debug("weakread reading " + command + " from " + message.getMessageId() + "@" + endPoint);
                 if (randomlyReadRepair(command))
                     message.setHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
-                remoteResults.add(MessagingService.instance.sendRR(message, endPoint));
+                remoteResults.add(new Pair<IAsyncResult,ReadCommand>(MessagingService.instance.sendRR(message, endPoint), command));
             }
         }
 
@@ -276,14 +286,17 @@ public class StorageProxy implements Sto
         }
         if (remoteResults != null)
         {
-            for (IAsyncResult iar: remoteResults)
+            for (Pair<IAsyncResult,ReadCommand> iar: remoteResults)
             {
                 byte[] body;
-                body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+                body = iar.left.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
                 ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
                 ReadResponse response = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
-                if (response.row() != null)
-                    rows.add(response.row());
+                Row row = response.row();
+                if (sessionWrites.get() != null)
+                    row = iar.right.mergeRowWithMemtables(Table.open(iar.right.table), row, sessionWrites.get());
+                if (row != null)
+                    rows.add(row);
             }
         }
 
@@ -351,6 +364,10 @@ public class StorageProxy implements Sto
             {
                 long startTime2 = System.currentTimeMillis();
                 row = quorumResponseHandler.get();
+                if (sessionWrites.get() != null)
+                {
+                    row = command.mergeRowWithMemtables(Table.open(command.table), row, sessionWrites.get());
+                }
                 if (row != null)
                     rows.add(row);
 
@@ -790,4 +807,14 @@ public class StorageProxy implements Sto
     {
         return !Gossiper.instance.getUnreachableMembers().isEmpty();
     }
+
+    public static void enableSessionConsistency()
+    {
+        sessionWrites.set(new HashMap<ColumnFamilyStore, Memtable>());
+    }
+
+    public static void disableSessionConsistency()
+    {
+        sessionWrites.remove();
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Wed Aug 18 16:40:42 2010
@@ -681,6 +681,7 @@ public class CassandraServer implements 
     {
         keySpace.remove();
         loginDone.remove();
+        StorageProxy.disableSessionConsistency();
 
         if (logger.isDebugEnabled())
             logger.debug("logout complete");
@@ -1014,5 +1015,15 @@ public class CassandraServer implements 
         return StorageProxy.checkSchemaAgreement();
     }
 
+    public void enable_session_consistency() throws TException
+    {
+        StorageProxy.enableSessionConsistency();
+    }
+
+    public void disable_session_consistency() throws TException
+    {
+        StorageProxy.disableSessionConsistency();
+    }
+
     // main method moved to CassandraDaemon
 }