You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/08/18 18:40:43 UTC
svn commit: r986781 - in /cassandra/trunk: ./ interface/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/
src/java/org/apache/cassandra/thrift/
Author: jbellis
Date: Wed Aug 18 16:40:42 2010
New Revision: 986781
URL: http://svn.apache.org/viewvc?rev=986781&view=rev
Log:
per-connection read-your-writes "session" consistency. patch by Brian Palmer; reviewed by jbellis for CASSANDRA-876
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/NEWS.txt
cassandra/trunk/interface/cassandra.thrift
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Aug 18 16:40:42 2010
@@ -16,6 +16,7 @@ dev
* add support for GT/E, LT/E in subordinate index clauses (CASSANDRA-1401)
* cfId counter got out of sync when CFs were added (CASSANDRA-1403)
* less chatty schema updates (CASSANDRA-1389)
+ * add optional per-connection read-your-writes consistency (CASSANDRA-876)
0.7-beta1
Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Wed Aug 18 16:40:42 2010
@@ -17,6 +17,7 @@ Features
- `truncate` thrift method allows clearing an entire ColumnFamily at once
- Hadoop OutputFormat support
- Up to 8x faster reads from row cache
+ - Optional per-connection read-your-writes consistency
- A new ByteOrderedPartitioner supports bytes keys with arbitrary content,
and orders keys by their byte value. This should be used in new
deployments instead of OrderPreservingPartitioner.
Modified: cassandra/trunk/interface/cassandra.thrift
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.thrift?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.thrift (original)
+++ cassandra/trunk/interface/cassandra.thrift Wed Aug 18 16:40:42 2010
@@ -385,6 +385,13 @@ service Cassandra {
# set keyspace
void set_keyspace(1: required string keyspace) throws (1:InvalidRequestException ire),
+ # enable session consistency
+ # call again to start a new session
+ void enable_session_consistency(),
+
+ # disable a previously enabled session
+ void disable_session_consistency(),
+
# retrieval methods
/**
Modified: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (original)
+++ cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java Wed Aug 18 16:40:42 2010
@@ -54,6 +54,10 @@ public class Cassandra {
public void set_keyspace(String keyspace) throws InvalidRequestException, TException;
+ public void enable_session_consistency() throws TException;
+
+ public void disable_session_consistency() throws TException;
+
/**
* Get the Column or SuperColumn at the given column_path. If no value is present, NotFoundException is thrown. (This is
* the only method that can throw an exception under non-failure conditions.)
@@ -287,6 +291,10 @@ public class Cassandra {
public void set_keyspace(String keyspace, AsyncMethodCallback<AsyncClient.set_keyspace_call> resultHandler) throws TException;
+ public void enable_session_consistency(AsyncMethodCallback<AsyncClient.enable_session_consistency_call> resultHandler) throws TException;
+
+ public void disable_session_consistency(AsyncMethodCallback<AsyncClient.disable_session_consistency_call> resultHandler) throws TException;
+
public void get(byte[] key, ColumnPath column_path, ConsistencyLevel consistency_level, AsyncMethodCallback<AsyncClient.get_call> resultHandler) throws TException;
public void get_slice(byte[] key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level, AsyncMethodCallback<AsyncClient.get_slice_call> resultHandler) throws TException;
@@ -454,6 +462,70 @@ public class Cassandra {
return;
}
+ public void enable_session_consistency() throws TException
+ {
+ send_enable_session_consistency();
+ recv_enable_session_consistency();
+ }
+
+ public void send_enable_session_consistency() throws TException
+ {
+ oprot_.writeMessageBegin(new TMessage("enable_session_consistency", TMessageType.CALL, ++seqid_));
+ enable_session_consistency_args args = new enable_session_consistency_args();
+ args.write(oprot_);
+ oprot_.writeMessageEnd();
+ oprot_.getTransport().flush();
+ }
+
+ public void recv_enable_session_consistency() throws TException
+ {
+ TMessage msg = iprot_.readMessageBegin();
+ if (msg.type == TMessageType.EXCEPTION) {
+ TApplicationException x = TApplicationException.read(iprot_);
+ iprot_.readMessageEnd();
+ throw x;
+ }
+ if (msg.seqid != seqid_) {
+ throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, "enable_session_consistency failed: out of sequence response");
+ }
+ enable_session_consistency_result result = new enable_session_consistency_result();
+ result.read(iprot_);
+ iprot_.readMessageEnd();
+ return;
+ }
+
+ public void disable_session_consistency() throws TException
+ {
+ send_disable_session_consistency();
+ recv_disable_session_consistency();
+ }
+
+ public void send_disable_session_consistency() throws TException
+ {
+ oprot_.writeMessageBegin(new TMessage("disable_session_consistency", TMessageType.CALL, ++seqid_));
+ disable_session_consistency_args args = new disable_session_consistency_args();
+ args.write(oprot_);
+ oprot_.writeMessageEnd();
+ oprot_.getTransport().flush();
+ }
+
+ public void recv_disable_session_consistency() throws TException
+ {
+ TMessage msg = iprot_.readMessageBegin();
+ if (msg.type == TMessageType.EXCEPTION) {
+ TApplicationException x = TApplicationException.read(iprot_);
+ iprot_.readMessageEnd();
+ throw x;
+ }
+ if (msg.seqid != seqid_) {
+ throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, "disable_session_consistency failed: out of sequence response");
+ }
+ disable_session_consistency_result result = new disable_session_consistency_result();
+ result.read(iprot_);
+ iprot_.readMessageEnd();
+ return;
+ }
+
public ColumnOrSuperColumn get(byte[] key, ColumnPath column_path, ConsistencyLevel consistency_level) throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException
{
send_get(key, column_path, consistency_level);
@@ -1577,6 +1649,62 @@ public class Cassandra {
}
}
+ public void enable_session_consistency(AsyncMethodCallback<enable_session_consistency_call> resultHandler) throws TException {
+ checkReady();
+ enable_session_consistency_call method_call = new enable_session_consistency_call(resultHandler, this, protocolFactory, transport);
+ manager.call(method_call);
+ }
+
+ public static class enable_session_consistency_call extends TAsyncMethodCall {
+ public enable_session_consistency_call(AsyncMethodCallback<enable_session_consistency_call> resultHandler, TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport) throws TException {
+ super(client, protocolFactory, transport, resultHandler, false);
+ }
+
+ public void write_args(TProtocol prot) throws TException {
+ prot.writeMessageBegin(new TMessage("enable_session_consistency", TMessageType.CALL, 0));
+ enable_session_consistency_args args = new enable_session_consistency_args();
+ args.write(prot);
+ prot.writeMessageEnd();
+ }
+
+ public void getResult() throws TException {
+ if (getState() != State.RESPONSE_READ) {
+ throw new IllegalStateException("Method call not finished!");
+ }
+ TMemoryInputTransport memoryTransport = new TMemoryInputTransport(getFrameBuffer().array());
+ TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+ (new Client(prot)).recv_enable_session_consistency();
+ }
+ }
+
+ public void disable_session_consistency(AsyncMethodCallback<disable_session_consistency_call> resultHandler) throws TException {
+ checkReady();
+ disable_session_consistency_call method_call = new disable_session_consistency_call(resultHandler, this, protocolFactory, transport);
+ manager.call(method_call);
+ }
+
+ public static class disable_session_consistency_call extends TAsyncMethodCall {
+ public disable_session_consistency_call(AsyncMethodCallback<disable_session_consistency_call> resultHandler, TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport) throws TException {
+ super(client, protocolFactory, transport, resultHandler, false);
+ }
+
+ public void write_args(TProtocol prot) throws TException {
+ prot.writeMessageBegin(new TMessage("disable_session_consistency", TMessageType.CALL, 0));
+ disable_session_consistency_args args = new disable_session_consistency_args();
+ args.write(prot);
+ prot.writeMessageEnd();
+ }
+
+ public void getResult() throws TException {
+ if (getState() != State.RESPONSE_READ) {
+ throw new IllegalStateException("Method call not finished!");
+ }
+ TMemoryInputTransport memoryTransport = new TMemoryInputTransport(getFrameBuffer().array());
+ TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+ (new Client(prot)).recv_disable_session_consistency();
+ }
+ }
+
public void get(byte[] key, ColumnPath column_path, ConsistencyLevel consistency_level, AsyncMethodCallback<get_call> resultHandler) throws TException {
checkReady();
get_call method_call = new get_call(key, column_path, consistency_level, resultHandler, this, protocolFactory, transport);
@@ -2448,6 +2576,8 @@ public class Cassandra {
iface_ = iface;
processMap_.put("login", new login());
processMap_.put("set_keyspace", new set_keyspace());
+ processMap_.put("enable_session_consistency", new enable_session_consistency());
+ processMap_.put("disable_session_consistency", new disable_session_consistency());
processMap_.put("get", new get());
processMap_.put("get_slice", new get_slice());
processMap_.put("get_count", new get_count());
@@ -2578,6 +2708,58 @@ public class Cassandra {
}
+ private class enable_session_consistency implements ProcessFunction {
+ public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException
+ {
+ enable_session_consistency_args args = new enable_session_consistency_args();
+ try {
+ args.read(iprot);
+ } catch (TProtocolException e) {
+ iprot.readMessageEnd();
+ TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
+ oprot.writeMessageBegin(new TMessage("enable_session_consistency", TMessageType.EXCEPTION, seqid));
+ x.write(oprot);
+ oprot.writeMessageEnd();
+ oprot.getTransport().flush();
+ return;
+ }
+ iprot.readMessageEnd();
+ enable_session_consistency_result result = new enable_session_consistency_result();
+ iface_.enable_session_consistency();
+ oprot.writeMessageBegin(new TMessage("enable_session_consistency", TMessageType.REPLY, seqid));
+ result.write(oprot);
+ oprot.writeMessageEnd();
+ oprot.getTransport().flush();
+ }
+
+ }
+
+ private class disable_session_consistency implements ProcessFunction {
+ public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException
+ {
+ disable_session_consistency_args args = new disable_session_consistency_args();
+ try {
+ args.read(iprot);
+ } catch (TProtocolException e) {
+ iprot.readMessageEnd();
+ TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
+ oprot.writeMessageBegin(new TMessage("disable_session_consistency", TMessageType.EXCEPTION, seqid));
+ x.write(oprot);
+ oprot.writeMessageEnd();
+ oprot.getTransport().flush();
+ return;
+ }
+ iprot.readMessageEnd();
+ disable_session_consistency_result result = new disable_session_consistency_result();
+ iface_.disable_session_consistency();
+ oprot.writeMessageBegin(new TMessage("disable_session_consistency", TMessageType.REPLY, seqid));
+ result.write(oprot);
+ oprot.writeMessageEnd();
+ oprot.getTransport().flush();
+ }
+
+ }
+
private class get implements ProcessFunction {
public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException
{
@@ -4847,6 +5029,768 @@ public class Cassandra {
}
+ public static class enable_session_consistency_args implements TBase<enable_session_consistency_args, enable_session_consistency_args._Fields>, java.io.Serializable, Cloneable {
+ private static final TStruct STRUCT_DESC = new TStruct("enable_session_consistency_args");
+
+
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements TFieldIdEnum {
+;
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+ public static final Map<_Fields, FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, FieldMetaData> tmpMap = new EnumMap<_Fields, FieldMetaData>(_Fields.class);
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ FieldMetaData.addStructMetaDataMap(enable_session_consistency_args.class, metaDataMap);
+ }
+
+ public enable_session_consistency_args() {
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public enable_session_consistency_args(enable_session_consistency_args other) {
+ }
+
+ public enable_session_consistency_args deepCopy() {
+ return new enable_session_consistency_args(this);
+ }
+
+ @Deprecated
+ public enable_session_consistency_args clone() {
+ return new enable_session_consistency_args(this);
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ }
+ }
+
+ public void setFieldValue(int fieldID, Object value) {
+ setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value);
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ public Object getFieldValue(int fieldId) {
+ return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId));
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ public boolean isSet(int fieldID) {
+ return isSet(_Fields.findByThriftIdOrThrow(fieldID));
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof enable_session_consistency_args)
+ return this.equals((enable_session_consistency_args)that);
+ return false;
+ }
+
+ public boolean equals(enable_session_consistency_args that) {
+ if (that == null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(enable_session_consistency_args other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ enable_session_consistency_args typedOther = (enable_session_consistency_args)other;
+
+ return 0;
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ default:
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ validate();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("enable_session_consistency_args(");
+ boolean first = true;
+
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws TException {
+ // check for required fields
+ }
+
+ }
+
+ public static class enable_session_consistency_result implements TBase<enable_session_consistency_result, enable_session_consistency_result._Fields>, java.io.Serializable, Cloneable {
+ private static final TStruct STRUCT_DESC = new TStruct("enable_session_consistency_result");
+
+
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements TFieldIdEnum {
+;
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+ public static final Map<_Fields, FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, FieldMetaData> tmpMap = new EnumMap<_Fields, FieldMetaData>(_Fields.class);
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ FieldMetaData.addStructMetaDataMap(enable_session_consistency_result.class, metaDataMap);
+ }
+
+ public enable_session_consistency_result() {
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public enable_session_consistency_result(enable_session_consistency_result other) {
+ }
+
+ public enable_session_consistency_result deepCopy() {
+ return new enable_session_consistency_result(this);
+ }
+
+ @Deprecated
+ public enable_session_consistency_result clone() {
+ return new enable_session_consistency_result(this);
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ }
+ }
+
+ public void setFieldValue(int fieldID, Object value) {
+ setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value);
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ public Object getFieldValue(int fieldId) {
+ return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId));
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ public boolean isSet(int fieldID) {
+ return isSet(_Fields.findByThriftIdOrThrow(fieldID));
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof enable_session_consistency_result)
+ return this.equals((enable_session_consistency_result)that);
+ return false;
+ }
+
+ public boolean equals(enable_session_consistency_result that) {
+ if (that == null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(enable_session_consistency_result other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ enable_session_consistency_result typedOther = (enable_session_consistency_result)other;
+
+ return 0;
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ default:
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ validate();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ oprot.writeStructBegin(STRUCT_DESC);
+
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("enable_session_consistency_result(");
+ boolean first = true;
+
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws TException {
+ // check for required fields
+ }
+
+ }
+
+ public static class disable_session_consistency_args implements TBase<disable_session_consistency_args, disable_session_consistency_args._Fields>, java.io.Serializable, Cloneable {
+ private static final TStruct STRUCT_DESC = new TStruct("disable_session_consistency_args");
+
+
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements TFieldIdEnum {
+;
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+ public static final Map<_Fields, FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, FieldMetaData> tmpMap = new EnumMap<_Fields, FieldMetaData>(_Fields.class);
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ FieldMetaData.addStructMetaDataMap(disable_session_consistency_args.class, metaDataMap);
+ }
+
+ public disable_session_consistency_args() {
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public disable_session_consistency_args(disable_session_consistency_args other) {
+ }
+
+ public disable_session_consistency_args deepCopy() {
+ return new disable_session_consistency_args(this);
+ }
+
+ @Deprecated
+ public disable_session_consistency_args clone() {
+ return new disable_session_consistency_args(this);
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ }
+ }
+
+ public void setFieldValue(int fieldID, Object value) {
+ setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value);
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ public Object getFieldValue(int fieldId) {
+ return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId));
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ public boolean isSet(int fieldID) {
+ return isSet(_Fields.findByThriftIdOrThrow(fieldID));
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof disable_session_consistency_args)
+ return this.equals((disable_session_consistency_args)that);
+ return false;
+ }
+
+ public boolean equals(disable_session_consistency_args that) {
+ if (that == null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(disable_session_consistency_args other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ disable_session_consistency_args typedOther = (disable_session_consistency_args)other;
+
+ return 0;
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ default:
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ validate();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("disable_session_consistency_args(");
+ boolean first = true;
+
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws TException {
+ // check for required fields
+ }
+
+ }
+
+ public static class disable_session_consistency_result implements TBase<disable_session_consistency_result, disable_session_consistency_result._Fields>, java.io.Serializable, Cloneable {
+ private static final TStruct STRUCT_DESC = new TStruct("disable_session_consistency_result");
+
+
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements TFieldIdEnum {
+;
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+ public static final Map<_Fields, FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, FieldMetaData> tmpMap = new EnumMap<_Fields, FieldMetaData>(_Fields.class);
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ FieldMetaData.addStructMetaDataMap(disable_session_consistency_result.class, metaDataMap);
+ }
+
+ public disable_session_consistency_result() {
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public disable_session_consistency_result(disable_session_consistency_result other) {
+ }
+
+ public disable_session_consistency_result deepCopy() {
+ return new disable_session_consistency_result(this);
+ }
+
+ @Deprecated
+ public disable_session_consistency_result clone() {
+ return new disable_session_consistency_result(this);
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ }
+ }
+
+ public void setFieldValue(int fieldID, Object value) {
+ setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value);
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ public Object getFieldValue(int fieldId) {
+ return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId));
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ public boolean isSet(int fieldID) {
+ return isSet(_Fields.findByThriftIdOrThrow(fieldID));
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof disable_session_consistency_result)
+ return this.equals((disable_session_consistency_result)that);
+ return false;
+ }
+
+ public boolean equals(disable_session_consistency_result that) {
+ if (that == null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(disable_session_consistency_result other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ disable_session_consistency_result typedOther = (disable_session_consistency_result)other;
+
+ return 0;
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ default:
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ validate();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ oprot.writeStructBegin(STRUCT_DESC);
+
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("disable_session_consistency_result(");
+ boolean first = true;
+
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws TException {
+ // check for required fields
+ }
+
+ }
+
public static class get_args implements TBase<get_args, get_args._Fields>, java.io.Serializable, Cloneable {
private static final TStruct STRUCT_DESC = new TStruct("get_args");
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed Aug 18 16:40:42 2010
@@ -108,12 +108,7 @@ public class Memtable implements Compara
isFrozen = true;
}
- /**
- * Should only be called by ColumnFamilyStore.apply. NOT a public API.
- * (CFS handles locking to avoid submitting an op
- * to a flushing memtable. Any other way is unsafe.)
- */
- void put(DecoratedKey key, ColumnFamily columnFamily)
+ public void put(DecoratedKey key, ColumnFamily columnFamily)
{
assert !isFrozen; // not 100% foolproof but hell, it's an assert
resolve(key, columnFamily);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Wed Aug 18 16:40:42 2010
@@ -25,10 +25,12 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.concurrent.StageManager;
@@ -86,12 +88,41 @@ public abstract class ReadCommand
public abstract ReadCommand copy();
- public abstract Row getRow(Table table) throws IOException;
+ public Row getRow(Table table) throws IOException
+ {
+ return table.getRow(getQueryFilter());
+ }
+
+ protected abstract QueryFilter getQueryFilter();
- protected AbstractType getComparator()
+ protected AbstractType getComparator()
{
return ColumnFamily.getComparatorFor(table, getColumnFamilyName(), queryPath.superColumnName);
}
+
+ public Row mergeRowWithMemtables(Table table, Row row, Map<ColumnFamilyStore, Memtable> map)
+ {
+ QueryFilter filter = getQueryFilter();
+ ColumnFamilyStore cfStore = table.getColumnFamilyStore(filter.getColumnFamilyName());
+ Memtable memtable = map.get(cfStore);
+
+ final ColumnFamily memtableCf = ColumnFamily.create(cfStore.metadata);
+ IColumnIterator iter = filter.getMemtableColumnIterator(memtable, cfStore.getComparator());
+ filter.collectCollatedColumns(memtableCf, iter, (int) (System.currentTimeMillis() / 1000) - cfStore.metadata.gcGraceSeconds);
+ try
+ {
+ iter.close();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ if (row.cf == null)
+ return new Row(filter.key, memtableCf);
+ row.cf.resolve(memtableCf);
+ return new Row(filter.key, row.cf);
+ }
}
class ReadCommandSerializer implements ICompactSerializer<ReadCommand>
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java Wed Aug 18 16:40:42 2010
@@ -53,13 +53,6 @@ public class SliceByNamesReadCommand ext
}
@Override
- public Row getRow(Table table) throws IOException
- {
- DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
- return table.getRow(QueryFilter.getNamesFilter(dk, queryPath, columnNames));
- }
-
- @Override
public String toString()
{
return "SliceByNamesReadCommand(" +
@@ -70,6 +63,11 @@ public class SliceByNamesReadCommand ext
')';
}
+ protected QueryFilter getQueryFilter()
+ {
+ DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
+ return QueryFilter.getNamesFilter(dk, queryPath, columnNames);
+ }
}
class SliceByNamesReadCommandSerializer extends ReadCommandSerializer
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java Wed Aug 18 16:40:42 2010
@@ -65,10 +65,10 @@ public class SliceFromReadCommand extend
}
@Override
- public Row getRow(Table table) throws IOException
+ protected QueryFilter getQueryFilter()
{
DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
- return table.getRow(QueryFilter.getSliceFilter(dk, queryPath, start, finish, bitmasks, reversed, count));
+ return QueryFilter.getSliceFilter(dk, queryPath, start, finish, bitmasks, reversed, count);
}
@Override
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Aug 18 16:40:42 2010
@@ -31,6 +31,7 @@ import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.dht.LocalToken;
import org.apache.cassandra.io.sstable.SSTableDeletingReference;
@@ -469,6 +470,28 @@ public class Table
memtablesToFlush.put(cfs, memtableToFlush);
}
+ public void applyToMemtable(RowMutation rm, Map<ColumnFamilyStore, Memtable> memtables)
+ {
+ for (ColumnFamily columnFamily : rm.getColumnFamilies())
+ {
+ ColumnFamilyStore cfs = columnFamilyStores.get(columnFamily.id());
+ if (cfs == null)
+ {
+ logger.error("Attempting to mutate non-existant column family " + columnFamily.id());
+ continue;
+ }
+
+ DecoratedKey key = StorageService.getPartitioner().decorateKey(rm.key());
+ Memtable memtable = memtables.get(cfs);
+ if (memtable == null)
+ {
+ memtable = new Memtable(cfs, null);
+ memtables.put(cfs, memtable);
+ }
+ memtable.put(key, columnFamily);
+ }
+ }
+
public List<Future<?>> flush() throws IOException
{
List<Future<?>> futures = new ArrayList<Future<?>>();
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Aug 18 16:40:42 2010
@@ -28,7 +28,6 @@ import java.util.concurrent.*;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Multimap;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
@@ -51,6 +50,7 @@ import org.apache.cassandra.net.Messagin
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.LatencyTracker;
+import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.db.filter.QueryFilter;
@@ -64,6 +64,8 @@ public class StorageProxy implements Sto
private static final LatencyTracker rangeStats = new LatencyTracker();
private static final LatencyTracker writeStats = new LatencyTracker();
+ private static ThreadLocal<Map<ColumnFamilyStore, Memtable>> sessionWrites = new ThreadLocal<Map<ColumnFamilyStore,Memtable>>();
+
private StorageProxy() {}
static
{
@@ -94,7 +96,7 @@ public class StorageProxy implements Sto
RowMutation mostRecentRowMutation = null;
StorageService ss = StorageService.instance;
-
+
try
{
for (RowMutation rm : mutations)
@@ -161,6 +163,14 @@ public class StorageProxy implements Sto
{
responseHandler.get();
}
+ if (sessionWrites.get() != null)
+ {
+ for (RowMutation rm : mutations)
+ {
+ // no need to apply locally-written mutations to the session
+ Table.open(rm.getTable()).applyToMemtable(rm, sessionWrites.get());
+ }
+ }
}
catch (IOException e)
{
@@ -230,7 +240,7 @@ public class StorageProxy implements Sto
// send off all the commands asynchronously
List<Future<Object>> localFutures = null;
- List<IAsyncResult> remoteResults = null;
+ List<Pair<IAsyncResult,ReadCommand>> remoteResults = null;
for (ReadCommand command: commands)
{
InetAddress endPoint = StorageService.instance.findSuitableEndpoint(command.table, command.key);
@@ -247,13 +257,13 @@ public class StorageProxy implements Sto
else
{
if (remoteResults == null)
- remoteResults = new ArrayList<IAsyncResult>();
+ remoteResults = new ArrayList<Pair<IAsyncResult, ReadCommand>>();
Message message = command.makeReadMessage();
if (logger.isDebugEnabled())
logger.debug("weakread reading " + command + " from " + message.getMessageId() + "@" + endPoint);
if (randomlyReadRepair(command))
message.setHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
- remoteResults.add(MessagingService.instance.sendRR(message, endPoint));
+ remoteResults.add(new Pair<IAsyncResult,ReadCommand>(MessagingService.instance.sendRR(message, endPoint), command));
}
}
@@ -276,14 +286,17 @@ public class StorageProxy implements Sto
}
if (remoteResults != null)
{
- for (IAsyncResult iar: remoteResults)
+ for (Pair<IAsyncResult,ReadCommand> iar: remoteResults)
{
byte[] body;
- body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+ body = iar.left.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
ReadResponse response = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
- if (response.row() != null)
- rows.add(response.row());
+ Row row = response.row();
+ if (sessionWrites.get() != null)
+ row = iar.right.mergeRowWithMemtables(Table.open(iar.right.table), row, sessionWrites.get());
+ if (row != null)
+ rows.add(row);
}
}
@@ -351,6 +364,10 @@ public class StorageProxy implements Sto
{
long startTime2 = System.currentTimeMillis();
row = quorumResponseHandler.get();
+ if (sessionWrites.get() != null)
+ {
+ row = command.mergeRowWithMemtables(Table.open(command.table), row, sessionWrites.get());
+ }
if (row != null)
rows.add(row);
@@ -790,4 +807,14 @@ public class StorageProxy implements Sto
{
return !Gossiper.instance.getUnreachableMembers().isEmpty();
}
+
+ public static void enableSessionConsistency()
+ {
+ sessionWrites.set(new HashMap<ColumnFamilyStore, Memtable>());
+ }
+
+ public static void disableSessionConsistency()
+ {
+ sessionWrites.remove();
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=986781&r1=986780&r2=986781&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Wed Aug 18 16:40:42 2010
@@ -681,6 +681,7 @@ public class CassandraServer implements
{
keySpace.remove();
loginDone.remove();
+ StorageProxy.disableSessionConsistency();
if (logger.isDebugEnabled())
logger.debug("logout complete");
@@ -1014,5 +1015,15 @@ public class CassandraServer implements
return StorageProxy.checkSchemaAgreement();
}
+ public void enable_session_consistency() throws TException
+ {
+ StorageProxy.enableSessionConsistency();
+ }
+
+ public void disable_session_consistency() throws TException
+ {
+ StorageProxy.disableSessionConsistency();
+ }
+
// main method moved to CassandraDaemon
}