You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/09/09 05:15:07 UTC
[1/2] git commit: add atomic_batch_mutate patch by Aleksey Yeschenko;
reviewed by jbellis for CASSANDRA-4542
Updated Branches:
refs/heads/trunk 8b374b206 -> b38ca2879
add atomic_batch_mutate
patch by Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-4542
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b38ca287
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b38ca287
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b38ca287
Branch: refs/heads/trunk
Commit: b38ca2879cf1cbf5de17e1912772b6588eaa7de6
Parents: 18962d7
Author: Jonathan Ellis <jb...@apache.org>
Authored: Sat Sep 8 22:13:45 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Sat Sep 8 22:14:41 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 4 +
interface/cassandra.thrift | 28 +-
.../org/apache/cassandra/thrift/Cassandra.java | 4 +-
.../org/apache/cassandra/thrift/Constants.java | 2 +-
.../thrift/SchemaDisagreementException.java | 3 +
.../apache/cassandra/thrift/TimedOutException.java | 126 +++++-
.../org/apache/cassandra/config/CFMetaData.java | 7 +
.../org/apache/cassandra/config/KSMetaData.java | 3 +-
.../org/apache/cassandra/db/BatchlogManager.java | 231 ++++++++++
.../apache/cassandra/db/BatchlogManagerMBean.java | 22 +
.../org/apache/cassandra/db/ColumnFamilyStore.java | 6 -
src/java/org/apache/cassandra/db/SystemTable.java | 1 +
src/java/org/apache/cassandra/db/Table.java | 2 +-
.../exceptions/WriteTimeoutException.java | 5 +-
.../service/AbstractWriteResponseHandler.java | 2 +-
.../org/apache/cassandra/service/StorageProxy.java | 325 ++++++++++++---
.../apache/cassandra/service/StorageService.java | 1 +
.../apache/cassandra/thrift/CassandraServer.java | 64 +++-
.../apache/cassandra/thrift/ThriftConversion.java | 3 +
.../cassandra/transport/messages/ErrorMessage.java | 2 +-
21 files changed, 748 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1aa27e1..3af4e5e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2-beta1
+ * add atomic_batch_mutate (CASSANDRA-4542)
* increase default max_hint_window_in_ms to 3h (CASSANDRA-4632)
* include message initiation time to replicas so they can more
accurately drop timed-out requests (CASSANDRA-2858)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index c7a128e..1ab5ebe 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -67,6 +67,10 @@ Features
configuration instead of erroring out indefinitely
- event tracing can be configured per-connection ("trace_next_query")
or globally/probabilistically ("nodetool settraceprobability")
+ - Atomic batches are now supported server side, where Cassandra will
+ guarantee that (at the price of pre-writing the batch to another node
+ first), all mutations in the batch will be applied, even if the
+ coordinator fails mid-batch.
1.1.5
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/interface/cassandra.thrift
----------------------------------------------------------------------
diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift
index 7fdef49..a2d0294 100644
--- a/interface/cassandra.thrift
+++ b/interface/cassandra.thrift
@@ -55,7 +55,7 @@ namespace rb CassandraThrift
# An effort should be made not to break forward-client-compatibility either
# (e.g. one should avoid removing obsolete fields from the IDL), but no
# guarantees in this respect are made by the Cassandra project.
-const string VERSION = "19.33.0"
+const string VERSION = "19.34.0"
#
@@ -140,12 +140,18 @@ exception UnavailableException {
/** RPC timeout was exceeded. either a node failed mid-operation, or load was too high, or the requested op was too large. */
exception TimedOutException {
- /**
- * if a write operation was acknowledged some replicas but not enough to
- * satisfy the required ConsistencyLevel, the number of successful
- * replies will be given here
+ /**
+ * if a write operation was acknowledged by some replicas but not by enough to
+ * satisfy the required ConsistencyLevel, the number of successful
+ * replies will be given here. In case of atomic_batch_mutate method this field
+ * will be set to -1 if the batch was written to the batchlog and to 0 if it wasn't.
*/
1: optional i32 acknowledged_by
+
+ /**
+ * in case of atomic_batch_mutate method this field tells if the batch was written to the batchlog.
+ */
+ 2: optional bool acknowledged_by_batchlog
}
/** invalid authentication request (invalid keyspace, user does not exist, or credentials invalid) */
@@ -639,7 +645,6 @@ service Cassandra {
3:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
-
/**
Mutate many columns or super columns for many row keys. See also: Mutation.
@@ -648,7 +653,16 @@ service Cassandra {
void batch_mutate(1:required map<binary, map<string, list<Mutation>>> mutation_map,
2:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
-
+
+ /**
+ Atomically mutate many columns or super columns for many row keys. See also: Mutation.
+
+ mutation_map maps key to column family to a list of Mutation objects to take place at that scope.
+ **/
+ void atomic_batch_mutate(1:required map<binary, map<string, list<Mutation>>> mutation_map,
+ 2:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
+ throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
+
/**
Truncate will mark and entire column family as deleted.
From the user's perspective a successful call to truncate will result complete data deletion from cfname.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
index fc179c9..8421215 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
@@ -17893,8 +17893,6 @@ public class Cassandra {
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
- // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
- __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
@@ -27344,6 +27342,8 @@ public class Cassandra {
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
index 9d0701f..1810548 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
@@ -44,6 +44,6 @@ import org.slf4j.LoggerFactory;
public class Constants {
- public static final String VERSION = "19.33.0";
+ public static final String VERSION = "19.34.0";
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/interface/thrift/gen-java/org/apache/cassandra/thrift/SchemaDisagreementException.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/SchemaDisagreementException.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/SchemaDisagreementException.java
index 9d405fc..999bdd7 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/SchemaDisagreementException.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/SchemaDisagreementException.java
@@ -43,6 +43,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
+ * NOTE: This up outdated exception left for backward compatibility reasons,
+ * no actual schema agreement validation is done starting from Cassandra 1.2
+ *
* schemas are not in agreement across all nodes
*/
public class SchemaDisagreementException extends Exception implements org.apache.thrift.TBase<SchemaDisagreementException, SchemaDisagreementException._Fields>, java.io.Serializable, Cloneable {
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
index bdb63dc..0a53222 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
@@ -49,22 +49,33 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TimedOutException");
private static final org.apache.thrift.protocol.TField ACKNOWLEDGED_BY_FIELD_DESC = new org.apache.thrift.protocol.TField("acknowledged_by", org.apache.thrift.protocol.TType.I32, (short)1);
+ private static final org.apache.thrift.protocol.TField ACKNOWLEDGED_BY_BATCHLOG_FIELD_DESC = new org.apache.thrift.protocol.TField("acknowledged_by_batchlog", org.apache.thrift.protocol.TType.BOOL, (short)2);
/**
- * if a write operation was acknowledged some replicas but not enough to
+ * if a write operation was acknowledged by some replicas but not by enough to
* satisfy the required ConsistencyLevel, the number of successful
- * replies will be given here
+ * replies will be given here. In case of atomic_batch_mutate method this field
+ * will be set to -1 if the batch was written to the batchlog and to 0 if it wasn't.
*/
public int acknowledged_by; // required
+ /**
+ * in case of atomic_batch_mutate method this field tells if the batch was written to the batchlog.
+ */
+ public boolean acknowledged_by_batchlog; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
/**
- * if a write operation was acknowledged some replicas but not enough to
+ * if a write operation was acknowledged by some replicas but not by enough to
* satisfy the required ConsistencyLevel, the number of successful
- * replies will be given here
+ * replies will be given here. In case of atomic_batch_mutate method this field
+ * will be set to -1 if the batch was written to the batchlog and to 0 if it wasn't.
+ */
+ ACKNOWLEDGED_BY((short)1, "acknowledged_by"),
+ /**
+ * in case of atomic_batch_mutate method this field tells if the batch was written to the batchlog.
*/
- ACKNOWLEDGED_BY((short)1, "acknowledged_by");
+ ACKNOWLEDGED_BY_BATCHLOG((short)2, "acknowledged_by_batchlog");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -81,6 +92,8 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
switch(fieldId) {
case 1: // ACKNOWLEDGED_BY
return ACKNOWLEDGED_BY;
+ case 2: // ACKNOWLEDGED_BY_BATCHLOG
+ return ACKNOWLEDGED_BY_BATCHLOG;
default:
return null;
}
@@ -122,13 +135,16 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
// isset id assignments
private static final int __ACKNOWLEDGED_BY_ISSET_ID = 0;
- private BitSet __isset_bit_vector = new BitSet(1);
+ private static final int __ACKNOWLEDGED_BY_BATCHLOG_ISSET_ID = 1;
+ private BitSet __isset_bit_vector = new BitSet(2);
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
tmpMap.put(_Fields.ACKNOWLEDGED_BY, new org.apache.thrift.meta_data.FieldMetaData("acknowledged_by", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+ tmpMap.put(_Fields.ACKNOWLEDGED_BY_BATCHLOG, new org.apache.thrift.meta_data.FieldMetaData("acknowledged_by_batchlog", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TimedOutException.class, metaDataMap);
}
@@ -143,6 +159,7 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
__isset_bit_vector.clear();
__isset_bit_vector.or(other.__isset_bit_vector);
this.acknowledged_by = other.acknowledged_by;
+ this.acknowledged_by_batchlog = other.acknowledged_by_batchlog;
}
public TimedOutException deepCopy() {
@@ -153,21 +170,25 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
public void clear() {
setAcknowledged_byIsSet(false);
this.acknowledged_by = 0;
+ setAcknowledged_by_batchlogIsSet(false);
+ this.acknowledged_by_batchlog = false;
}
/**
- * if a write operation was acknowledged some replicas but not enough to
+ * if a write operation was acknowledged by some replicas but not by enough to
* satisfy the required ConsistencyLevel, the number of successful
- * replies will be given here
+ * replies will be given here. In case of atomic_batch_mutate method this field
+ * will be set to -1 if the batch was written to the batchlog and to 0 if it wasn't.
*/
public int getAcknowledged_by() {
return this.acknowledged_by;
}
/**
- * if a write operation was acknowledged some replicas but not enough to
+ * if a write operation was acknowledged by some replicas but not by enough to
* satisfy the required ConsistencyLevel, the number of successful
- * replies will be given here
+ * replies will be given here. In case of atomic_batch_mutate method this field
+ * will be set to -1 if the batch was written to the batchlog and to 0 if it wasn't.
*/
public TimedOutException setAcknowledged_by(int acknowledged_by) {
this.acknowledged_by = acknowledged_by;
@@ -188,6 +209,35 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
__isset_bit_vector.set(__ACKNOWLEDGED_BY_ISSET_ID, value);
}
+ /**
+ * in case of atomic_batch_mutate method this field tells if the batch was written to the batchlog.
+ */
+ public boolean isAcknowledged_by_batchlog() {
+ return this.acknowledged_by_batchlog;
+ }
+
+ /**
+ * in case of atomic_batch_mutate method this field tells if the batch was written to the batchlog.
+ */
+ public TimedOutException setAcknowledged_by_batchlog(boolean acknowledged_by_batchlog) {
+ this.acknowledged_by_batchlog = acknowledged_by_batchlog;
+ setAcknowledged_by_batchlogIsSet(true);
+ return this;
+ }
+
+ public void unsetAcknowledged_by_batchlog() {
+ __isset_bit_vector.clear(__ACKNOWLEDGED_BY_BATCHLOG_ISSET_ID);
+ }
+
+ /** Returns true if field acknowledged_by_batchlog is set (has been assigned a value) and false otherwise */
+ public boolean isSetAcknowledged_by_batchlog() {
+ return __isset_bit_vector.get(__ACKNOWLEDGED_BY_BATCHLOG_ISSET_ID);
+ }
+
+ public void setAcknowledged_by_batchlogIsSet(boolean value) {
+ __isset_bit_vector.set(__ACKNOWLEDGED_BY_BATCHLOG_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case ACKNOWLEDGED_BY:
@@ -198,6 +248,14 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
}
break;
+ case ACKNOWLEDGED_BY_BATCHLOG:
+ if (value == null) {
+ unsetAcknowledged_by_batchlog();
+ } else {
+ setAcknowledged_by_batchlog((Boolean)value);
+ }
+ break;
+
}
}
@@ -206,6 +264,9 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
case ACKNOWLEDGED_BY:
return Integer.valueOf(getAcknowledged_by());
+ case ACKNOWLEDGED_BY_BATCHLOG:
+ return Boolean.valueOf(isAcknowledged_by_batchlog());
+
}
throw new IllegalStateException();
}
@@ -219,6 +280,8 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
switch (field) {
case ACKNOWLEDGED_BY:
return isSetAcknowledged_by();
+ case ACKNOWLEDGED_BY_BATCHLOG:
+ return isSetAcknowledged_by_batchlog();
}
throw new IllegalStateException();
}
@@ -245,6 +308,15 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
return false;
}
+ boolean this_present_acknowledged_by_batchlog = true && this.isSetAcknowledged_by_batchlog();
+ boolean that_present_acknowledged_by_batchlog = true && that.isSetAcknowledged_by_batchlog();
+ if (this_present_acknowledged_by_batchlog || that_present_acknowledged_by_batchlog) {
+ if (!(this_present_acknowledged_by_batchlog && that_present_acknowledged_by_batchlog))
+ return false;
+ if (this.acknowledged_by_batchlog != that.acknowledged_by_batchlog)
+ return false;
+ }
+
return true;
}
@@ -257,6 +329,11 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
if (present_acknowledged_by)
builder.append(acknowledged_by);
+ boolean present_acknowledged_by_batchlog = true && (isSetAcknowledged_by_batchlog());
+ builder.append(present_acknowledged_by_batchlog);
+ if (present_acknowledged_by_batchlog)
+ builder.append(acknowledged_by_batchlog);
+
return builder.toHashCode();
}
@@ -278,6 +355,16 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetAcknowledged_by_batchlog()).compareTo(typedOther.isSetAcknowledged_by_batchlog());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetAcknowledged_by_batchlog()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.acknowledged_by_batchlog, typedOther.acknowledged_by_batchlog);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -303,6 +390,14 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
}
break;
+ case 2: // ACKNOWLEDGED_BY_BATCHLOG
+ if (field.type == org.apache.thrift.protocol.TType.BOOL) {
+ this.acknowledged_by_batchlog = iprot.readBool();
+ setAcknowledged_by_batchlogIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
}
@@ -323,6 +418,11 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
oprot.writeI32(this.acknowledged_by);
oprot.writeFieldEnd();
}
+ if (isSetAcknowledged_by_batchlog()) {
+ oprot.writeFieldBegin(ACKNOWLEDGED_BY_BATCHLOG_FIELD_DESC);
+ oprot.writeBool(this.acknowledged_by_batchlog);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -337,6 +437,12 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
sb.append(this.acknowledged_by);
first = false;
}
+ if (isSetAcknowledged_by_batchlog()) {
+ if (!first) sb.append(", ");
+ sb.append("acknowledged_by_batchlog:");
+ sb.append(this.acknowledged_by_batchlog);
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index d410210..9ee684c 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -192,6 +192,13 @@ public final class CFMetaData
+ " PRIMARY KEY (session_id, event_id)"
+ ");", Tracing.TRACE_KS);
+ public static final CFMetaData BatchlogCF = compile(16, "CREATE TABLE " + SystemTable.BATCHLOG_CF + " ("
+ + "id uuid PRIMARY KEY,"
+ + "coordinator inet,"
+ + "written_at timestamp,"
+ + "data blob"
+ + ") WITH COMMENT='uncommited batches' AND gc_grace_seconds=0");
+
public enum Caching
{
ALL, KEYS_ONLY, ROWS_ONLY, NONE;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java
index 5b5ab77..88f3037 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -73,7 +73,8 @@ public final class KSMetaData
public static KSMetaData systemKeyspace()
{
- List<CFMetaData> cfDefs = Arrays.asList(CFMetaData.LocalCf,
+ List<CFMetaData> cfDefs = Arrays.asList(CFMetaData.BatchlogCF,
+ CFMetaData.LocalCf,
CFMetaData.PeersCf,
CFMetaData.HintsCf,
CFMetaData.IndexCf,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
new file mode 100644
index 0000000..2ae9361
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import com.google.common.collect.ImmutableSortedSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.InetAddressType;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class BatchlogManager implements BatchlogManagerMBean
+{
+ private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
+ private static final int VERSION = MessagingService.VERSION_12;
+ private static final long TIMEOUT = 2 * DatabaseDescriptor.getRpcTimeout();
+
+ private static final ByteBuffer COORDINATOR = columnName("coordinator");
+ private static final ByteBuffer WRITTEN_AT = columnName("written_at");
+ private static final ByteBuffer DATA = columnName("data");
+ private static final SortedSet<ByteBuffer> META = ImmutableSortedSet.of(COORDINATOR, WRITTEN_AT);
+
+ private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
+
+ public static final BatchlogManager instance = new BatchlogManager();
+
+ public void start()
+ {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ replayAllFailedBatches();
+ }
+ };
+ StorageService.optionalTasks.scheduleWithFixedDelay(runnable,
+ StorageService.RING_DELAY,
+ 10 * 60 * 1000,
+ TimeUnit.MILLISECONDS);
+ }
+
+ public static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID uuid)
+ {
+ long timestamp = FBUtilities.timestampMicros();
+ ByteBuffer coordinator = InetAddressType.instance.decompose(FBUtilities.getBroadcastAddress());
+ ByteBuffer writtenAt = LongType.instance.decompose(timestamp / 1000);
+ ByteBuffer data = serializeRowMutations(mutations);
+
+ ColumnFamily cf = ColumnFamily.create(CFMetaData.BatchlogCF);
+ cf.addColumn(new Column(COORDINATOR, coordinator, timestamp));
+ cf.addColumn(new Column(WRITTEN_AT, writtenAt, timestamp));
+ cf.addColumn(new Column(DATA, data, timestamp));
+ RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
+ rm.add(cf);
+
+ return rm;
+ }
+
+ private static ByteBuffer serializeRowMutations(Collection<RowMutation> mutations)
+ {
+ FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+
+ try
+ {
+ dos.writeInt(mutations.size());
+ for (RowMutation rm : mutations)
+ RowMutation.serializer.serialize(rm, dos, VERSION);
+ }
+ catch (IOException e)
+ {
+ throw new AssertionError(); // cannot happen.
+ }
+
+ return ByteBuffer.wrap(bos.toByteArray());
+ }
+
+ private static void replayAllFailedBatches()
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Started replayAllFailedBatches");
+
+ ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
+
+ if (store.isEmpty())
+ return;
+
+ IPartitioner partitioner = StorageService.getPartitioner();
+ RowPosition minPosition = partitioner.getMinimumToken().minKeyBound();
+ AbstractBounds<RowPosition> range = new Range<RowPosition>(minPosition, minPosition, partitioner);
+
+ List<Row> rows = store.getRangeSlice(null, range, Integer.MAX_VALUE, new NamesQueryFilter(META), null);
+
+ for (Row row : rows)
+ {
+ if (row.cf.isMarkedForDelete())
+ continue;
+
+ IColumn coordinatorColumn = row.cf.getColumn(COORDINATOR);
+ IColumn writtenAtColumn = row.cf.getColumn(WRITTEN_AT);
+
+ if (coordinatorColumn == null || writtenAtColumn == null)
+ {
+ replayBatch(row.key);
+ continue;
+ }
+
+ InetAddress coordinator = InetAddressType.instance.compose(coordinatorColumn.value());
+ long writtenAt = LongType.instance.compose(writtenAtColumn.value());
+ // if the batch is new and its coordinator is alive - give it a chance to complete naturally.
+ if (System.currentTimeMillis() < writtenAt + TIMEOUT && FailureDetector.instance.isAlive(coordinator))
+ continue;
+
+ replayBatch(row.key);
+ }
+
+ if (logger.isDebugEnabled())
+ logger.debug("Finished replayAllFailedBatches");
+ }
+
+ private static void replayBatch(DecoratedKey key)
+ {
+ UUID uuid = UUIDType.instance.compose(key.key);
+
+ if (logger.isDebugEnabled())
+ logger.debug("Replaying batch {}", uuid);
+
+ ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
+ QueryFilter filter = QueryFilter.getNamesFilter(key, new QueryPath(SystemTable.BATCHLOG_CF), DATA);
+ ColumnFamily batch = store.getColumnFamily(filter);
+
+ if (batch == null || batch.isMarkedForDelete())
+ return;
+
+ IColumn dataColumn = batch.getColumn(DATA);
+ try
+ {
+ if (dataColumn != null)
+ writeHintsForSerializedMutations(dataColumn.value());
+ }
+ catch (IOException e)
+ {
+ logger.warn("Skipped batch replay of {} due to {}", uuid, e);
+ }
+
+ deleteBatch(key);
+ }
+
+ private static void writeHintsForSerializedMutations(ByteBuffer data) throws IOException
+ {
+ DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
+ int size = in.readInt();
+ for (int i = 0; i < size; i++)
+ writeHintsForMutation(RowMutation.serializer.deserialize(in, VERSION));
+ }
+
+ private static void writeHintsForMutation(RowMutation mutation) throws IOException
+ {
+ for (InetAddress target : StorageProxy.getWriteEndpoints(mutation.getTable(), mutation.key()))
+ {
+ if (target.equals(FBUtilities.getBroadcastAddress()))
+ mutation.apply();
+ else
+ StorageProxy.writeHintForMutation(mutation, target);
+ }
+ }
+
+ private static void deleteBatch(DecoratedKey key)
+ {
+ RowMutation rm = new RowMutation(Table.SYSTEM_KS, key.key);
+ rm.delete(new QueryPath(SystemTable.BATCHLOG_CF), FBUtilities.timestampMicros());
+ rm.apply();
+ }
+
+ private static ByteBuffer columnName(String name)
+ {
+ ByteBuffer raw = UTF8Type.instance.decompose(name);
+ return CFMetaData.BatchlogCF.getCfDef().getColumnNameBuilder().add(raw).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java b/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
new file mode 100644
index 0000000..0322b21
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+public interface BatchlogManagerMBean
+{
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 44dc23a..dd70fc7 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1113,11 +1113,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return getColumnFamily(filter, gcBefore());
}
- public ColumnFamily getColumnFamily(QueryFilter filter, ISortedColumns.Factory factory)
- {
- return getColumnFamily(filter, gcBefore());
- }
-
public int gcBefore()
{
return (int) (System.currentTimeMillis() / 1000) - metadata.getGcGraceSeconds();
@@ -1371,7 +1366,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
QueryFilter filter = new QueryFilter(null, new QueryPath(columnFamily, superColumn, null), columnFilter);
- List<Row> rows;
final ViewFragment view = markReferenced(startWith, stopAt);
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index 18dead3..1cbab1a 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -58,6 +58,7 @@ public class SystemTable
public static final String INDEX_CF = "IndexInfo";
public static final String NODE_ID_CF = "NodeIdInfo";
public static final String HINTS_CF = "hints";
+ public static final String BATCHLOG_CF = "batchlog";
// see layout description in the DefsTable class header
public static final String SCHEMA_KEYSPACES_CF = "schema_keyspaces";
public static final String SCHEMA_COLUMNFAMILIES_CF = "schema_columnfamilies";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index b700ef6..f489ac9 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -338,7 +338,7 @@ public class Table
public Row getRow(QueryFilter filter)
{
ColumnFamilyStore cfStore = getColumnFamilyStore(filter.getColumnFamilyName());
- ColumnFamily columnFamily = cfStore.getColumnFamily(filter, ArrayBackedSortedColumns.factory());
+ ColumnFamily columnFamily = cfStore.getColumnFamily(filter);
return new Row(filter.key, columnFamily);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java b/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java
index 607215b..c6066f6 100644
--- a/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java
+++ b/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java
@@ -25,8 +25,11 @@ import org.apache.cassandra.db.ConsistencyLevel;
public class WriteTimeoutException extends RequestTimeoutException
{
- public WriteTimeoutException(ConsistencyLevel consistency, int received, int blockFor)
+ public final boolean writtenToBatchlog;
+
+ public WriteTimeoutException(ConsistencyLevel consistency, int received, int blockFor, boolean writtenToBatchlog)
{
super(ExceptionCode.WRITE_TIMEOUT, consistency, received, blockFor);
+ this.writtenToBatchlog = writtenToBatchlog;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 664d9ea..4b3244e 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -63,7 +63,7 @@ public abstract class AbstractWriteResponseHandler implements IWriteResponseHand
}
if (!success)
- throw new WriteTimeoutException(consistencyLevel, ackCount(), blockFor());
+ throw new WriteTimeoutException(consistencyLevel, ackCount(), blockFor(), false);
}
protected abstract int ackCount();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 24625df..e3d7de5 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -30,10 +30,7 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.base.Function;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.MapMaker;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.*;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +40,9 @@ import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.RingPosition;
@@ -56,8 +56,10 @@ import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.metrics.ClientRequestMetrics;
import org.apache.cassandra.net.*;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.WrappedRunnable;
public class StorageProxy implements StorageProxyMBean
{
@@ -233,6 +235,147 @@ public class StorageProxy implements StorageProxyMBean
}
/**
+ * See mutate. Adds additional steps before and after writing a batch.
+ * Before writing the batch (but after doing availability check against the FD for the row replicas):
+ * write the entire batch to a batchlog elsewhere in the cluster.
+ * After: remove the batchlog entry (after writing hints for the batch rows, if necessary).
+ *
+ * @param mutations the RowMutations to be applied across the replicas
+ * @param consistency_level the consistency level for the operation
+ */
+ public static void mutateAtomically(List<RowMutation> mutations, ConsistencyLevel consistency_level)
+ throws UnavailableException, WriteTimeoutException
+ {
+ long startTime = System.nanoTime();
+
+ if (logger.isDebugEnabled())
+ logger.debug("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level);
+
+ List<WriteResponseHandlerWrapper> wrappers = new ArrayList<WriteResponseHandlerWrapper>(mutations.size());
+ String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+
+ try
+ {
+ // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
+ for (RowMutation mutation : mutations)
+ {
+ WriteResponseHandlerWrapper wrapper = wrapResponseHandler(mutation, consistency_level);
+ // exit early if we can't fulfill the CL at this time.
+ wrapper.handler.assureSufficientLiveNodes();
+ wrappers.add(wrapper);
+ }
+
+ // write to the batchlog
+ Collection<InetAddress> batchlogEndpoints = getBatchlogEndpoints(localDataCenter);
+ UUID batchUUID = UUID.randomUUID();
+ syncWriteToBatchlog(mutations, localDataCenter, batchlogEndpoints, batchUUID);
+
+ // now actually perform the writes and wait for them to complete
+ syncWriteBatchedMutations(wrappers, localDataCenter, consistency_level);
+
+ // remove the batchlog entries asynchronously
+ asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID);
+ }
+ catch (UnavailableException e)
+ {
+ writeMetrics.unavailables.mark();
+ ClientRequestMetrics.writeUnavailables.inc();
+ throw e;
+ }
+ catch (WriteTimeoutException e)
+ {
+ writeMetrics.timeouts.mark();
+ ClientRequestMetrics.writeTimeouts.inc();
+ throw e;
+ }
+ finally
+ {
+ writeMetrics.addNano(System.nanoTime() - startTime);
+ }
+ }
+
+ private static void syncWriteToBatchlog(List<RowMutation> mutations,
+ String localDataCenter,
+ Collection<InetAddress> endpoints,
+ UUID uuid)
+ throws WriteTimeoutException
+ {
+ RowMutation rm = BatchlogManager.getBatchlogMutationFor(mutations, uuid);
+ IWriteResponseHandler handler = WriteResponseHandler.create(endpoints, ConsistencyLevel.ONE, Table.SYSTEM_KS, null);
+
+ try
+ {
+ sendMessagesToOneDC(rm.createMessage(), endpoints, true, handler);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Error writing to batchlog", e);
+ }
+
+ try
+ {
+ handler.get();
+ }
+ catch (WriteTimeoutException e)
+ {
+ throw new WriteTimeoutException(e.consistency, 0, e.blockFor, false);
+ }
+ }
+
+ private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
+ {
+ RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
+ rm.delete(new QueryPath(SystemTable.BATCHLOG_CF), FBUtilities.timestampMicros());
+ IWriteResponseHandler handler = WriteResponseHandler.create(endpoints, ConsistencyLevel.ANY, Table.SYSTEM_KS, null);
+
+ try
+ {
+ sendMessagesToOneDC(rm.createMessage(), endpoints, true, handler);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Error deleting batch " + uuid, e);
+ }
+ }
+
+ private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers,
+ String localDataCenter,
+ ConsistencyLevel consistencyLevel)
+ throws WriteTimeoutException
+ {
+ for (WriteResponseHandlerWrapper wrapper : wrappers)
+ {
+ try
+ {
+ sendToHintedEndpoints(wrapper.mutation, wrapper.endpoints, wrapper.handler, localDataCenter, consistencyLevel);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Error writing key " + ByteBufferUtil.bytesToHex(wrapper.mutation.key()), e);
+ }
+ catch (OverloadedException e)
+ {
+ // turn OE into TOE.
+ throw new WriteTimeoutException(consistencyLevel, -1, 0, true);
+ }
+ }
+
+ for (WriteResponseHandlerWrapper wrapper : wrappers)
+ {
+ try
+ {
+ wrapper.handler.get();
+ }
+ catch (WriteTimeoutException e)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Write timeout {} for {}", e, wrapper.mutation.toString(true));
+ throw new WriteTimeoutException(e.consistency, -1, e.blockFor, true);
+ }
+ }
+ }
+
+ /**
* Perform the write of a mutation given a WritePerformer.
* Gather the list of write endpoints, apply locally and/or forward the mutation to
* said write endpoint (deletaged to the actual WritePerformer) and wait for the
@@ -267,7 +410,31 @@ public class StorageProxy implements StorageProxyMBean
return responseHandler;
}
- private static Collection<InetAddress> getWriteEndpoints(String table, ByteBuffer key)
+ // same as above except does not initiate writes (but does perfrom availability checks).
+ private static WriteResponseHandlerWrapper wrapResponseHandler(RowMutation mutation, ConsistencyLevel consistency_level)
+ {
+ AbstractReplicationStrategy rs = Table.open(mutation.getTable()).getReplicationStrategy();
+ Collection<InetAddress> writeEndpoints = getWriteEndpoints(mutation.getTable(), mutation.key());
+ IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, consistency_level, null);
+ return new WriteResponseHandlerWrapper(responseHandler, mutation, writeEndpoints);
+ }
+
+ // used by atomic_batch_mutate to decouple availability check from the write itself, caches consistency level and endpoints.
+ private static class WriteResponseHandlerWrapper
+ {
+ final IWriteResponseHandler handler;
+ final RowMutation mutation;
+ final Collection<InetAddress> endpoints;
+
+ WriteResponseHandlerWrapper(IWriteResponseHandler handler, RowMutation mutation, Collection<InetAddress> endpoints)
+ {
+ this.handler = handler;
+ this.mutation = mutation;
+ this.endpoints = endpoints;
+ }
+ }
+
+ public static Collection<InetAddress> getWriteEndpoints(String table, ByteBuffer key)
{
StorageService ss = StorageService.instance;
Token tk = StorageService.getPartitioner().getToken(key);
@@ -275,6 +442,46 @@ public class StorageProxy implements StorageProxyMBean
return ss.getTokenMetadata().getWriteEndpoints(tk, table, naturalEndpoints);
}
+ /*
+ * Replicas are picked manually:
+ * - replicas should be alive according to the failure detector
+ * - replicas should be in the local datacenter
+ * - choose min(2, number of qualifying candiates above)
+ * - allow the local node to be the only replica only if it's a single-node cluster
+ */
+ private static Collection<InetAddress> getBatchlogEndpoints(String localDataCenter) throws UnavailableException
+ {
+ // will include every known node including localhost.
+ Collection<InetAddress> localMembers = StorageService.instance.getTokenMetadata().getTopology().getDatacenterEndpoints().get(localDataCenter);
+
+ // special case for single-node datacenters
+ if (localMembers.size() == 1)
+ return localMembers;
+
+ // not a single-node cluster - don't count the local node.
+ localMembers.remove(FBUtilities.getBroadcastAddress());
+
+ // include only alive nodes
+ List<InetAddress> candidates = new ArrayList<InetAddress>(localMembers.size());
+ for (InetAddress member : localMembers)
+ {
+ if (FailureDetector.instance.isAlive(member))
+ candidates.add(member);
+ }
+
+ if (candidates.isEmpty())
+ throw new UnavailableException(ConsistencyLevel.ONE, 1, 0);
+
+ if (candidates.size() > 2)
+ {
+ IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+ snitch.sortByProximity(FBUtilities.getBroadcastAddress(), candidates);
+ candidates = candidates.subList(0, 2);
+ }
+
+ return candidates;
+ }
+
/**
* Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node
* is not available.
@@ -368,18 +575,7 @@ public class StorageProxy implements StorageProxyMBean
try
{
- UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target);
- if ((hostId == null) && (MessagingService.instance().getVersion(target) < MessagingService.VERSION_12))
- {
- logger.warn("Unable to store hint for host with missing ID, {} (old node?)", target.toString());
- return;
- }
- assert hostId != null : "Missing host ID for " + target.getHostAddress();
- RowMutation hintedMutation = RowMutation.hintFor(mutation, hostId);
- hintedMutation.apply();
-
- totalHints.incrementAndGet();
-
+ writeHintForMutation(mutation, target);
// Notify the handler only for CL == ANY
if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY)
responseHandler.response(null);
@@ -395,6 +591,21 @@ public class StorageProxy implements StorageProxyMBean
return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
}
+ public static void writeHintForMutation(RowMutation mutation, InetAddress target) throws IOException
+ {
+ UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target);
+ if ((hostId == null) && (MessagingService.instance().getVersion(target) < MessagingService.VERSION_12))
+ {
+ logger.warn("Unable to store hint for host with missing ID, {} (old node?)", target.toString());
+ return;
+ }
+ assert hostId != null : "Missing host ID for " + target.getHostAddress();
+ RowMutation hintedMutation = RowMutation.hintFor(mutation, hostId);
+ hintedMutation.apply();
+
+ totalHints.incrementAndGet();
+ }
+
/**
* for each datacenter, send a message to one node to relay the write to other replicas
*/
@@ -403,52 +614,56 @@ public class StorageProxy implements StorageProxyMBean
{
for (Map.Entry<String, Multimap<MessageOut, InetAddress>> entry: dcMessages.entrySet())
{
- String dataCenter = entry.getKey();
-
- // send the messages corresponding to this datacenter
+ boolean isLocalDC = entry.getKey().equals(localDataCenter);
for (Map.Entry<MessageOut, Collection<InetAddress>> messages: entry.getValue().asMap().entrySet())
{
MessageOut message = messages.getKey();
+ Collection<InetAddress> targets = messages.getValue();
// a single message object is used for unhinted writes, so clean out any forwards
// from previous loop iterations
message = message.withHeaderRemoved(RowMutation.FORWARD_TO);
- Iterator<InetAddress> iter = messages.getValue().iterator();
- InetAddress target = iter.next();
+ sendMessagesToOneDC(message, targets, isLocalDC, handler);
+ }
+ }
+ }
- // direct writes to local DC or old Cassadra versions
- if (dataCenter.equals(localDataCenter) || MessagingService.instance().getVersion(target) < MessagingService.VERSION_11)
- {
- // yes, the loop and non-loop code here are the same; this is clunky but we want to avoid
- // creating a second iterator since we already have a perfectly good one
- MessagingService.instance().sendRR(message, target, handler);
- while (iter.hasNext())
- {
- target = iter.next();
- MessagingService.instance().sendRR(message, target, handler);
- }
- continue;
- }
+ private static void sendMessagesToOneDC(MessageOut message, Collection<InetAddress> targets, boolean localDC, IWriteResponseHandler handler) throws IOException
+ {
+ Iterator<InetAddress> iter = targets.iterator();
+ InetAddress target = iter.next();
- // Add all the other destinations of the same message as a FORWARD_HEADER entry
- FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- dos.writeInt(messages.getValue().size() - 1);
- while (iter.hasNext())
- {
- InetAddress destination = iter.next();
- CompactEndpointSerializationHelper.serialize(destination, dos);
- String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
- dos.writeUTF(id);
- if (logger.isDebugEnabled())
- logger.debug("Adding FWD message to: " + destination + " with ID " + id);
- }
- message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray());
- // send the combined message + forward headers
- String id = MessagingService.instance().sendRR(message, target, handler);
- if (logger.isDebugEnabled())
- logger.debug("Sending message to: " + target + " with ID " + id);
+ // direct writes to local DC or old Cassandra versions
+ if (localDC || MessagingService.instance().getVersion(target) < MessagingService.VERSION_11)
+ {
+ // yes, the loop and non-loop code here are the same; this is clunky but we want to avoid
+ // creating a second iterator since we already have a perfectly good one
+ MessagingService.instance().sendRR(message, target, handler);
+ while (iter.hasNext())
+ {
+ target = iter.next();
+ MessagingService.instance().sendRR(message, target, handler);
}
+ return;
+ }
+
+ // Add all the other destinations of the same message as a FORWARD_HEADER entry
+ FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ dos.writeInt(targets.size() - 1);
+ while (iter.hasNext())
+ {
+ InetAddress destination = iter.next();
+ CompactEndpointSerializationHelper.serialize(destination, dos);
+ String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
+ dos.writeUTF(id);
+ if (logger.isDebugEnabled())
+ logger.debug("Adding FWD message to: " + destination + " with ID " + id);
}
+ message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray());
+ // send the combined message + forward headers
+ String id = MessagingService.instance().sendRR(message, target, handler);
+ if (logger.isDebugEnabled())
+ logger.debug("Sending message to: " + target + " with ID " + id);
}
private static void insertLocal(final RowMutation rm, final IWriteResponseHandler responseHandler)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index cc94727..b347aa6 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -515,6 +515,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
Gossiper.instance.addLocalApplicationState(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());
HintedHandOffManager.instance.start();
+ BatchlogManager.instance.start();
// We bootstrap if we haven't successfully bootstrapped before, as long as we are not a seed.
// If we are a seed, or if the user manually sets auto_bootstrap to false,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 44c6038..40f6681 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -28,7 +28,6 @@ import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -46,7 +45,6 @@ import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.dht.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.exceptions.ReadTimeoutException;
@@ -670,8 +668,10 @@ public class CassandraServer implements Cassandra.Iface
}
}
- private void internal_batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
- throws RequestValidationException, UnavailableException, TimedOutException
+ private List<IMutation> createMutationList(ConsistencyLevel consistency_level,
+ Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map,
+ boolean allowCounterMutations)
+ throws RequestValidationException
{
List<String> cfamsSeen = new ArrayList<String>();
List<IMutation> rowMutations = new ArrayList<IMutation>();
@@ -731,11 +731,17 @@ public class CassandraServer implements Cassandra.Iface
}
if (rmStandard != null && !rmStandard.isEmpty())
rowMutations.add(rmStandard);
+
if (rmCounter != null && !rmCounter.isEmpty())
- rowMutations.add(new org.apache.cassandra.db.CounterMutation(rmCounter, ThriftConversion.fromThrift(consistency_level)));
+ {
+ if (allowCounterMutations)
+ rowMutations.add(new CounterMutation(rmCounter, ThriftConversion.fromThrift(consistency_level)));
+ else
+ throw new org.apache.cassandra.exceptions.InvalidRequestException("Counter mutations are not allowed in atomic batches");
+ }
}
- doInsert(consistency_level, rowMutations);
+ return rowMutations;
}
public void batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
@@ -759,7 +765,40 @@ public class CassandraServer implements Cassandra.Iface
try
{
- internal_batch_mutate(mutation_map, consistency_level);
+ doInsert(consistency_level, createMutationList(consistency_level, mutation_map, true));
+ }
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
+ finally
+ {
+ Tracing.instance().stopSession();
+ }
+ }
+
+ public void atomic_batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
+ throws InvalidRequestException, UnavailableException, TimedOutException
+ {
+ if (startSessionIfRequested())
+ {
+ Map<String, String> traceParameters = Maps.newLinkedHashMap();
+ for (Map.Entry<ByteBuffer, Map<String, List<Mutation>>> mutationEntry : mutation_map.entrySet())
+ {
+ traceParameters.put(ByteBufferUtil.bytesToHex(mutationEntry.getKey()),
+ Joiner.on(";").withKeyValueSeparator(":").join(mutationEntry.getValue()));
+ }
+ traceParameters.put("consistency_level", consistency_level.name());
+ Tracing.instance().begin("atomic_batch_mutate", traceParameters);
+ }
+ else
+ {
+ logger.debug("atomic_batch_mutate");
+ }
+
+ try
+ {
+ doInsert(consistency_level, createMutationList(consistency_level, mutation_map, false), true);
}
catch (RequestValidationException e)
{
@@ -825,6 +864,12 @@ public class CassandraServer implements Cassandra.Iface
private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations)
throws UnavailableException, TimedOutException, org.apache.cassandra.exceptions.InvalidRequestException
{
+ doInsert(consistency_level, mutations, false);
+ }
+
+ private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations, boolean mutateAtomically)
+ throws UnavailableException, TimedOutException, org.apache.cassandra.exceptions.InvalidRequestException
+ {
org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
consistencyLevel.validateForWrite(state().getKeyspace());
if (mutations.isEmpty())
@@ -833,7 +878,10 @@ public class CassandraServer implements Cassandra.Iface
schedule(DatabaseDescriptor.getWriteRpcTimeout());
try
{
- StorageProxy.mutate(mutations, consistencyLevel);
+ if (mutateAtomically)
+ StorageProxy.mutateAtomically((List<RowMutation>) mutations, consistencyLevel);
+ else
+ StorageProxy.mutate(mutations, consistencyLevel);
}
catch (RequestExecutionException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index ac823e9..aa7d236 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -86,7 +86,10 @@ public class ThriftConversion
{
TimedOutException toe = new TimedOutException();
if (e instanceof WriteTimeoutException)
+ {
toe.setAcknowledged_by(((WriteTimeoutException)e).received);
+ toe.setAcknowledged_by_batchlog(((WriteTimeoutException)e).writtenToBatchlog);
+ }
return toe;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index c21e727..ecb387b 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -79,7 +79,7 @@ public class ErrorMessage extends Message.Response
ConsistencyLevel cl = Enum.valueOf(ConsistencyLevel.class, CBUtil.readString(body));
int received = body.readInt();
int blockFor = body.readInt();
- te = new WriteTimeoutException(cl, received, blockFor);
+ te = new WriteTimeoutException(cl, received, blockFor, false);
}
break;
case READ_TIMEOUT: