You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/09/09 05:15:07 UTC

[1/2] git commit: add atomic_batch_mutate patch by Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-4542

Updated Branches:
  refs/heads/trunk 8b374b206 -> b38ca2879


add atomic_batch_mutate
patch by Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-4542


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b38ca287
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b38ca287
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b38ca287

Branch: refs/heads/trunk
Commit: b38ca2879cf1cbf5de17e1912772b6588eaa7de6
Parents: 18962d7
Author: Jonathan Ellis <jb...@apache.org>
Authored: Sat Sep 8 22:13:45 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Sat Sep 8 22:14:41 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 NEWS.txt                                           |    4 +
 interface/cassandra.thrift                         |   28 +-
 .../org/apache/cassandra/thrift/Cassandra.java     |    4 +-
 .../org/apache/cassandra/thrift/Constants.java     |    2 +-
 .../thrift/SchemaDisagreementException.java        |    3 +
 .../apache/cassandra/thrift/TimedOutException.java |  126 +++++-
 .../org/apache/cassandra/config/CFMetaData.java    |    7 +
 .../org/apache/cassandra/config/KSMetaData.java    |    3 +-
 .../org/apache/cassandra/db/BatchlogManager.java   |  231 ++++++++++
 .../apache/cassandra/db/BatchlogManagerMBean.java  |   22 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |    6 -
 src/java/org/apache/cassandra/db/SystemTable.java  |    1 +
 src/java/org/apache/cassandra/db/Table.java        |    2 +-
 .../exceptions/WriteTimeoutException.java          |    5 +-
 .../service/AbstractWriteResponseHandler.java      |    2 +-
 .../org/apache/cassandra/service/StorageProxy.java |  325 ++++++++++++---
 .../apache/cassandra/service/StorageService.java   |    1 +
 .../apache/cassandra/thrift/CassandraServer.java   |   64 +++-
 .../apache/cassandra/thrift/ThriftConversion.java  |    3 +
 .../cassandra/transport/messages/ErrorMessage.java |    2 +-
 21 files changed, 748 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1aa27e1..3af4e5e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2-beta1
+ * add atomic_batch_mutate (CASSANDRA-4542)
  * increase default max_hint_window_in_ms to 3h (CASSANDRA-4632)
  * include message initiation time to replicas so they can more
    accurately drop timed-out requests (CASSANDRA-2858)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index c7a128e..1ab5ebe 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -67,6 +67,10 @@ Features
       configuration instead of erroring out indefinitely
     - event tracing can be configured per-connection ("trace_next_query")
       or globally/probabilistically ("nodetool settraceprobability")
+    - Atomic batches are now supported server side, where Cassandra will
+      guarantee that (at the price of pre-writing the batch to another node
+      first), all mutations in the batch will be applied, even if the
+      coordinator fails mid-batch.
 
 
 1.1.5

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/interface/cassandra.thrift
----------------------------------------------------------------------
diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift
index 7fdef49..a2d0294 100644
--- a/interface/cassandra.thrift
+++ b/interface/cassandra.thrift
@@ -55,7 +55,7 @@ namespace rb CassandraThrift
 # An effort should be made not to break forward-client-compatibility either
 # (e.g. one should avoid removing obsolete fields from the IDL), but no
 # guarantees in this respect are made by the Cassandra project.
-const string VERSION = "19.33.0"
+const string VERSION = "19.34.0"
 
 
 #
@@ -140,12 +140,18 @@ exception UnavailableException {
 
 /** RPC timeout was exceeded.  either a node failed mid-operation, or load was too high, or the requested op was too large. */
 exception TimedOutException {
-    /** 
-     * if a write operation was acknowledged some replicas but not enough to 
-     * satisfy the required ConsistencyLevel, the number of successful 
-     * replies will be given here
+    /**
+     * if a write operation was acknowledged by some replicas but not by enough to
+     * satisfy the required ConsistencyLevel, the number of successful
+     * replies will be given here. In case of atomic_batch_mutate method this field
+     * will be set to -1 if the batch was written to the batchlog and to 0 if it wasn't.
      */
     1: optional i32 acknowledged_by
+
+    /**
+     * in case of atomic_batch_mutate method this field tells if the batch was written to the batchlog.
+     */
+    2: optional bool acknowledged_by_batchlog
 }
 
 /** invalid authentication request (invalid keyspace, user does not exist, or credentials invalid) */
@@ -639,7 +645,6 @@ service Cassandra {
                       3:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
       throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
 
-
   /**
     Mutate many columns or super columns for many row keys. See also: Mutation.
 
@@ -648,7 +653,16 @@ service Cassandra {
   void batch_mutate(1:required map<binary, map<string, list<Mutation>>> mutation_map,
                     2:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
        throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
-       
+
+  /**
+    Atomically mutate many columns or super columns for many row keys. See also: Mutation.
+
+    mutation_map maps key to column family to a list of Mutation objects to take place at that scope.
+  **/
+  void atomic_batch_mutate(1:required map<binary, map<string, list<Mutation>>> mutation_map,
+                           2:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
+       throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
+
   /**
    Truncate will mark and entire column family as deleted.
    From the user's perspective a successful call to truncate will result complete data deletion from cfname.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
index fc179c9..8421215 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
@@ -17893,8 +17893,6 @@ public class Cassandra {
 
     private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
       try {
-        // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
-        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);
@@ -27344,6 +27342,8 @@ public class Cassandra {
 
     private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
       try {
+        // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
index 9d0701f..1810548 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
@@ -44,6 +44,6 @@ import org.slf4j.LoggerFactory;
 
 public class Constants {
 
-  public static final String VERSION = "19.33.0";
+  public static final String VERSION = "19.34.0";
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/interface/thrift/gen-java/org/apache/cassandra/thrift/SchemaDisagreementException.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/SchemaDisagreementException.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/SchemaDisagreementException.java
index 9d405fc..999bdd7 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/SchemaDisagreementException.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/SchemaDisagreementException.java
@@ -43,6 +43,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
+ * NOTE: This up outdated exception left for backward compatibility reasons,
+ * no actual schema agreement validation is done starting from Cassandra 1.2
+ * 
  * schemas are not in agreement across all nodes
  */
 public class SchemaDisagreementException extends Exception implements org.apache.thrift.TBase<SchemaDisagreementException, SchemaDisagreementException._Fields>, java.io.Serializable, Cloneable {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
index bdb63dc..0a53222 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
@@ -49,22 +49,33 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TimedOutException");
 
   private static final org.apache.thrift.protocol.TField ACKNOWLEDGED_BY_FIELD_DESC = new org.apache.thrift.protocol.TField("acknowledged_by", org.apache.thrift.protocol.TType.I32, (short)1);
+  private static final org.apache.thrift.protocol.TField ACKNOWLEDGED_BY_BATCHLOG_FIELD_DESC = new org.apache.thrift.protocol.TField("acknowledged_by_batchlog", org.apache.thrift.protocol.TType.BOOL, (short)2);
 
   /**
-   * if a write operation was acknowledged some replicas but not enough to
+   * if a write operation was acknowledged by some replicas but not by enough to
    * satisfy the required ConsistencyLevel, the number of successful
-   * replies will be given here
+   * replies will be given here. In case of atomic_batch_mutate method this field
+   * will be set to -1 if the batch was written to the batchlog and to 0 if it wasn't.
    */
   public int acknowledged_by; // required
+  /**
+   * in case of atomic_batch_mutate method this field tells if the batch was written to the batchlog.
+   */
+  public boolean acknowledged_by_batchlog; // required
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
     /**
-     * if a write operation was acknowledged some replicas but not enough to
+     * if a write operation was acknowledged by some replicas but not by enough to
      * satisfy the required ConsistencyLevel, the number of successful
-     * replies will be given here
+     * replies will be given here. In case of atomic_batch_mutate method this field
+     * will be set to -1 if the batch was written to the batchlog and to 0 if it wasn't.
+     */
+    ACKNOWLEDGED_BY((short)1, "acknowledged_by"),
+    /**
+     * in case of atomic_batch_mutate method this field tells if the batch was written to the batchlog.
      */
-    ACKNOWLEDGED_BY((short)1, "acknowledged_by");
+    ACKNOWLEDGED_BY_BATCHLOG((short)2, "acknowledged_by_batchlog");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -81,6 +92,8 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
       switch(fieldId) {
         case 1: // ACKNOWLEDGED_BY
           return ACKNOWLEDGED_BY;
+        case 2: // ACKNOWLEDGED_BY_BATCHLOG
+          return ACKNOWLEDGED_BY_BATCHLOG;
         default:
           return null;
       }
@@ -122,13 +135,16 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
 
   // isset id assignments
   private static final int __ACKNOWLEDGED_BY_ISSET_ID = 0;
-  private BitSet __isset_bit_vector = new BitSet(1);
+  private static final int __ACKNOWLEDGED_BY_BATCHLOG_ISSET_ID = 1;
+  private BitSet __isset_bit_vector = new BitSet(2);
 
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
     tmpMap.put(_Fields.ACKNOWLEDGED_BY, new org.apache.thrift.meta_data.FieldMetaData("acknowledged_by", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+    tmpMap.put(_Fields.ACKNOWLEDGED_BY_BATCHLOG, new org.apache.thrift.meta_data.FieldMetaData("acknowledged_by_batchlog", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TimedOutException.class, metaDataMap);
   }
@@ -143,6 +159,7 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
     __isset_bit_vector.clear();
     __isset_bit_vector.or(other.__isset_bit_vector);
     this.acknowledged_by = other.acknowledged_by;
+    this.acknowledged_by_batchlog = other.acknowledged_by_batchlog;
   }
 
   public TimedOutException deepCopy() {
@@ -153,21 +170,25 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
   public void clear() {
     setAcknowledged_byIsSet(false);
     this.acknowledged_by = 0;
+    setAcknowledged_by_batchlogIsSet(false);
+    this.acknowledged_by_batchlog = false;
   }
 
   /**
-   * if a write operation was acknowledged some replicas but not enough to
+   * if a write operation was acknowledged by some replicas but not by enough to
    * satisfy the required ConsistencyLevel, the number of successful
-   * replies will be given here
+   * replies will be given here. In case of atomic_batch_mutate method this field
+   * will be set to -1 if the batch was written to the batchlog and to 0 if it wasn't.
    */
   public int getAcknowledged_by() {
     return this.acknowledged_by;
   }
 
   /**
-   * if a write operation was acknowledged some replicas but not enough to
+   * if a write operation was acknowledged by some replicas but not by enough to
    * satisfy the required ConsistencyLevel, the number of successful
-   * replies will be given here
+   * replies will be given here. In case of atomic_batch_mutate method this field
+   * will be set to -1 if the batch was written to the batchlog and to 0 if it wasn't.
    */
   public TimedOutException setAcknowledged_by(int acknowledged_by) {
     this.acknowledged_by = acknowledged_by;
@@ -188,6 +209,35 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
     __isset_bit_vector.set(__ACKNOWLEDGED_BY_ISSET_ID, value);
   }
 
+  /**
+   * in case of atomic_batch_mutate method this field tells if the batch was written to the batchlog.
+   */
+  public boolean isAcknowledged_by_batchlog() {
+    return this.acknowledged_by_batchlog;
+  }
+
+  /**
+   * in case of atomic_batch_mutate method this field tells if the batch was written to the batchlog.
+   */
+  public TimedOutException setAcknowledged_by_batchlog(boolean acknowledged_by_batchlog) {
+    this.acknowledged_by_batchlog = acknowledged_by_batchlog;
+    setAcknowledged_by_batchlogIsSet(true);
+    return this;
+  }
+
+  public void unsetAcknowledged_by_batchlog() {
+    __isset_bit_vector.clear(__ACKNOWLEDGED_BY_BATCHLOG_ISSET_ID);
+  }
+
+  /** Returns true if field acknowledged_by_batchlog is set (has been assigned a value) and false otherwise */
+  public boolean isSetAcknowledged_by_batchlog() {
+    return __isset_bit_vector.get(__ACKNOWLEDGED_BY_BATCHLOG_ISSET_ID);
+  }
+
+  public void setAcknowledged_by_batchlogIsSet(boolean value) {
+    __isset_bit_vector.set(__ACKNOWLEDGED_BY_BATCHLOG_ISSET_ID, value);
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case ACKNOWLEDGED_BY:
@@ -198,6 +248,14 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
       }
       break;
 
+    case ACKNOWLEDGED_BY_BATCHLOG:
+      if (value == null) {
+        unsetAcknowledged_by_batchlog();
+      } else {
+        setAcknowledged_by_batchlog((Boolean)value);
+      }
+      break;
+
     }
   }
 
@@ -206,6 +264,9 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
     case ACKNOWLEDGED_BY:
       return Integer.valueOf(getAcknowledged_by());
 
+    case ACKNOWLEDGED_BY_BATCHLOG:
+      return Boolean.valueOf(isAcknowledged_by_batchlog());
+
     }
     throw new IllegalStateException();
   }
@@ -219,6 +280,8 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
     switch (field) {
     case ACKNOWLEDGED_BY:
       return isSetAcknowledged_by();
+    case ACKNOWLEDGED_BY_BATCHLOG:
+      return isSetAcknowledged_by_batchlog();
     }
     throw new IllegalStateException();
   }
@@ -245,6 +308,15 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
         return false;
     }
 
+    boolean this_present_acknowledged_by_batchlog = true && this.isSetAcknowledged_by_batchlog();
+    boolean that_present_acknowledged_by_batchlog = true && that.isSetAcknowledged_by_batchlog();
+    if (this_present_acknowledged_by_batchlog || that_present_acknowledged_by_batchlog) {
+      if (!(this_present_acknowledged_by_batchlog && that_present_acknowledged_by_batchlog))
+        return false;
+      if (this.acknowledged_by_batchlog != that.acknowledged_by_batchlog)
+        return false;
+    }
+
     return true;
   }
 
@@ -257,6 +329,11 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
     if (present_acknowledged_by)
       builder.append(acknowledged_by);
 
+    boolean present_acknowledged_by_batchlog = true && (isSetAcknowledged_by_batchlog());
+    builder.append(present_acknowledged_by_batchlog);
+    if (present_acknowledged_by_batchlog)
+      builder.append(acknowledged_by_batchlog);
+
     return builder.toHashCode();
   }
 
@@ -278,6 +355,16 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(isSetAcknowledged_by_batchlog()).compareTo(typedOther.isSetAcknowledged_by_batchlog());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetAcknowledged_by_batchlog()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.acknowledged_by_batchlog, typedOther.acknowledged_by_batchlog);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -303,6 +390,14 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
           }
           break;
+        case 2: // ACKNOWLEDGED_BY_BATCHLOG
+          if (field.type == org.apache.thrift.protocol.TType.BOOL) {
+            this.acknowledged_by_batchlog = iprot.readBool();
+            setAcknowledged_by_batchlogIsSet(true);
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
         default:
           org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
       }
@@ -323,6 +418,11 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
       oprot.writeI32(this.acknowledged_by);
       oprot.writeFieldEnd();
     }
+    if (isSetAcknowledged_by_batchlog()) {
+      oprot.writeFieldBegin(ACKNOWLEDGED_BY_BATCHLOG_FIELD_DESC);
+      oprot.writeBool(this.acknowledged_by_batchlog);
+      oprot.writeFieldEnd();
+    }
     oprot.writeFieldStop();
     oprot.writeStructEnd();
   }
@@ -337,6 +437,12 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
       sb.append(this.acknowledged_by);
       first = false;
     }
+    if (isSetAcknowledged_by_batchlog()) {
+      if (!first) sb.append(", ");
+      sb.append("acknowledged_by_batchlog:");
+      sb.append(this.acknowledged_by_batchlog);
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index d410210..9ee684c 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -192,6 +192,13 @@ public final class CFMetaData
                                                                + "  PRIMARY KEY (session_id, event_id)"
                                                                + ");", Tracing.TRACE_KS);
 
+    public static final CFMetaData BatchlogCF = compile(16, "CREATE TABLE " + SystemTable.BATCHLOG_CF + " ("
+                                                            + "id uuid PRIMARY KEY,"
+                                                            + "coordinator inet,"
+                                                            + "written_at timestamp,"
+                                                            + "data blob"
+                                                            + ") WITH COMMENT='uncommited batches' AND gc_grace_seconds=0");
+
     public enum Caching
     {
         ALL, KEYS_ONLY, ROWS_ONLY, NONE;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java
index 5b5ab77..88f3037 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -73,7 +73,8 @@ public final class KSMetaData
 
     public static KSMetaData systemKeyspace()
     {
-        List<CFMetaData> cfDefs = Arrays.asList(CFMetaData.LocalCf,
+        List<CFMetaData> cfDefs = Arrays.asList(CFMetaData.BatchlogCF,
+                                                CFMetaData.LocalCf,
                                                 CFMetaData.PeersCf,
                                                 CFMetaData.HintsCf,
                                                 CFMetaData.IndexCf,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
new file mode 100644
index 0000000..2ae9361
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import com.google.common.collect.ImmutableSortedSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.InetAddressType;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class BatchlogManager implements BatchlogManagerMBean
+{
+    private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
+    private static final int VERSION = MessagingService.VERSION_12;
+    private static final long TIMEOUT = 2 * DatabaseDescriptor.getRpcTimeout();
+
+    private static final ByteBuffer COORDINATOR = columnName("coordinator");
+    private static final ByteBuffer WRITTEN_AT = columnName("written_at");
+    private static final ByteBuffer DATA = columnName("data");
+    private static final SortedSet<ByteBuffer> META = ImmutableSortedSet.of(COORDINATOR, WRITTEN_AT);
+
+    private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
+
+    public static final BatchlogManager instance = new BatchlogManager();
+
+    public void start()
+    {
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        Runnable runnable = new Runnable()
+        {
+            public void run()
+            {
+                replayAllFailedBatches();
+            }
+        };
+        StorageService.optionalTasks.scheduleWithFixedDelay(runnable,
+                                                            StorageService.RING_DELAY,
+                                                            10 * 60 * 1000,
+                                                            TimeUnit.MILLISECONDS);
+    }
+
+    public static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID uuid)
+    {
+        long timestamp = FBUtilities.timestampMicros();
+        ByteBuffer coordinator = InetAddressType.instance.decompose(FBUtilities.getBroadcastAddress());
+        ByteBuffer writtenAt = LongType.instance.decompose(timestamp / 1000);
+        ByteBuffer data = serializeRowMutations(mutations);
+
+        ColumnFamily cf = ColumnFamily.create(CFMetaData.BatchlogCF);
+        cf.addColumn(new Column(COORDINATOR, coordinator, timestamp));
+        cf.addColumn(new Column(WRITTEN_AT, writtenAt, timestamp));
+        cf.addColumn(new Column(DATA, data, timestamp));
+        RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
+        rm.add(cf);
+
+        return rm;
+    }
+
+    private static ByteBuffer serializeRowMutations(Collection<RowMutation> mutations)
+    {
+        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+
+        try
+        {
+            dos.writeInt(mutations.size());
+            for (RowMutation rm : mutations)
+                RowMutation.serializer.serialize(rm, dos, VERSION);
+        }
+        catch (IOException e)
+        {
+            throw new AssertionError(); // cannot happen.
+        }
+
+        return ByteBuffer.wrap(bos.toByteArray());
+    }
+
+    private static void replayAllFailedBatches()
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("Started replayAllFailedBatches");
+
+        ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
+
+        if (store.isEmpty())
+            return;
+
+        IPartitioner partitioner = StorageService.getPartitioner();
+        RowPosition minPosition = partitioner.getMinimumToken().minKeyBound();
+        AbstractBounds<RowPosition> range = new Range<RowPosition>(minPosition, minPosition, partitioner);
+
+        List<Row> rows = store.getRangeSlice(null, range, Integer.MAX_VALUE, new NamesQueryFilter(META), null);
+
+        for (Row row : rows)
+        {
+            if (row.cf.isMarkedForDelete())
+                continue;
+
+            IColumn coordinatorColumn = row.cf.getColumn(COORDINATOR);
+            IColumn writtenAtColumn = row.cf.getColumn(WRITTEN_AT);
+
+            if (coordinatorColumn == null || writtenAtColumn == null)
+            {
+                replayBatch(row.key);
+                continue;
+            }
+
+            InetAddress coordinator = InetAddressType.instance.compose(coordinatorColumn.value());
+            long writtenAt = LongType.instance.compose(writtenAtColumn.value());
+            // if the batch is new and its coordinator is alive - give it a chance to complete naturally.
+            if (System.currentTimeMillis() < writtenAt + TIMEOUT && FailureDetector.instance.isAlive(coordinator))
+                continue;
+
+            replayBatch(row.key);
+        }
+
+        if (logger.isDebugEnabled())
+            logger.debug("Finished replayAllFailedBatches");
+    }
+
+    private static void replayBatch(DecoratedKey key)
+    {
+        UUID uuid = UUIDType.instance.compose(key.key);
+
+        if (logger.isDebugEnabled())
+            logger.debug("Replaying batch {}", uuid);
+
+        ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
+        QueryFilter filter = QueryFilter.getNamesFilter(key, new QueryPath(SystemTable.BATCHLOG_CF), DATA);
+        ColumnFamily batch = store.getColumnFamily(filter);
+
+        if (batch == null || batch.isMarkedForDelete())
+            return;
+
+        IColumn dataColumn = batch.getColumn(DATA);
+        try
+        {
+            if (dataColumn != null)
+                writeHintsForSerializedMutations(dataColumn.value());
+        }
+        catch (IOException e)
+        {
+            logger.warn("Skipped batch replay of {} due to {}", uuid, e);
+        }
+
+        deleteBatch(key);
+    }
+
+    private static void writeHintsForSerializedMutations(ByteBuffer data) throws IOException
+    {
+        DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
+        int size = in.readInt();
+        for (int i = 0; i < size; i++)
+            writeHintsForMutation(RowMutation.serializer.deserialize(in, VERSION));
+    }
+
+    private static void writeHintsForMutation(RowMutation mutation) throws IOException
+    {
+        for (InetAddress target : StorageProxy.getWriteEndpoints(mutation.getTable(), mutation.key()))
+        {
+            if (target.equals(FBUtilities.getBroadcastAddress()))
+                mutation.apply();
+            else
+                StorageProxy.writeHintForMutation(mutation, target);
+        }
+    }
+
+    private static void deleteBatch(DecoratedKey key)
+    {
+        RowMutation rm = new RowMutation(Table.SYSTEM_KS, key.key);
+        rm.delete(new QueryPath(SystemTable.BATCHLOG_CF), FBUtilities.timestampMicros());
+        rm.apply();
+    }
+
+    private static ByteBuffer columnName(String name)
+    {
+        ByteBuffer raw = UTF8Type.instance.decompose(name);
+        return CFMetaData.BatchlogCF.getCfDef().getColumnNameBuilder().add(raw).build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java b/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
new file mode 100644
index 0000000..0322b21
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+public interface BatchlogManagerMBean
+{
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 44dc23a..dd70fc7 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1113,11 +1113,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return getColumnFamily(filter, gcBefore());
     }
 
-    public ColumnFamily getColumnFamily(QueryFilter filter, ISortedColumns.Factory factory)
-    {
-        return getColumnFamily(filter, gcBefore());
-    }
-
     public int gcBefore()
     {
         return (int) (System.currentTimeMillis() / 1000) - metadata.getGcGraceSeconds();
@@ -1371,7 +1366,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         QueryFilter filter = new QueryFilter(null, new QueryPath(columnFamily, superColumn, null), columnFilter);
 
-        List<Row> rows;
         final ViewFragment view = markReferenced(startWith, stopAt);
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index 18dead3..1cbab1a 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -58,6 +58,7 @@ public class SystemTable
     public static final String INDEX_CF = "IndexInfo";
     public static final String NODE_ID_CF = "NodeIdInfo";
     public static final String HINTS_CF = "hints";
+    public static final String BATCHLOG_CF = "batchlog";
     // see layout description in the DefsTable class header
     public static final String SCHEMA_KEYSPACES_CF = "schema_keyspaces";
     public static final String SCHEMA_COLUMNFAMILIES_CF = "schema_columnfamilies";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index b700ef6..f489ac9 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -338,7 +338,7 @@ public class Table
     public Row getRow(QueryFilter filter)
     {
         ColumnFamilyStore cfStore = getColumnFamilyStore(filter.getColumnFamilyName());
-        ColumnFamily columnFamily = cfStore.getColumnFamily(filter, ArrayBackedSortedColumns.factory());
+        ColumnFamily columnFamily = cfStore.getColumnFamily(filter);
         return new Row(filter.key, columnFamily);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java b/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java
index 607215b..c6066f6 100644
--- a/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java
+++ b/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java
@@ -25,8 +25,11 @@ import org.apache.cassandra.db.ConsistencyLevel;
 
 public class WriteTimeoutException extends RequestTimeoutException
 {
-    public WriteTimeoutException(ConsistencyLevel consistency, int received, int blockFor)
+    public final boolean writtenToBatchlog;
+
+    public WriteTimeoutException(ConsistencyLevel consistency, int received, int blockFor, boolean writtenToBatchlog)
     {
         super(ExceptionCode.WRITE_TIMEOUT, consistency, received, blockFor);
+        this.writtenToBatchlog = writtenToBatchlog;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 664d9ea..4b3244e 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -63,7 +63,7 @@ public abstract class AbstractWriteResponseHandler implements IWriteResponseHand
         }
 
         if (!success)
-            throw new WriteTimeoutException(consistencyLevel, ackCount(), blockFor());
+            throw new WriteTimeoutException(consistencyLevel, ackCount(), blockFor(), false);
     }
 
     protected abstract int ackCount();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 24625df..e3d7de5 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -30,10 +30,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.base.Function;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.MapMaker;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.*;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,6 +40,9 @@ import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.RingPosition;
@@ -56,8 +56,10 @@ import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.metrics.ClientRequestMetrics;
 import org.apache.cassandra.net.*;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.WrappedRunnable;
 
 public class StorageProxy implements StorageProxyMBean
 {
@@ -233,6 +235,147 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     /**
+     * See mutate. Adds additional steps before and after writing a batch.
+     * Before writing the batch (but after doing availability check against the FD for the row replicas):
+     *      write the entire batch to a batchlog elsewhere in the cluster.
+     * After: remove the batchlog entry (after writing hints for the batch rows, if necessary).
+     *
+     * @param mutations the RowMutations to be applied across the replicas
+     * @param consistency_level the consistency level for the operation
+     */
+    public static void mutateAtomically(List<RowMutation> mutations, ConsistencyLevel consistency_level)
+    throws UnavailableException, WriteTimeoutException
+    {
+        long startTime = System.nanoTime();
+
+        if (logger.isDebugEnabled())
+            logger.debug("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level);
+
+        List<WriteResponseHandlerWrapper> wrappers = new ArrayList<WriteResponseHandlerWrapper>(mutations.size());
+        String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+
+        try
+        {
+            // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
+            for (RowMutation mutation : mutations)
+            {
+                WriteResponseHandlerWrapper wrapper = wrapResponseHandler(mutation, consistency_level);
+                // exit early if we can't fulfill the CL at this time.
+                wrapper.handler.assureSufficientLiveNodes();
+                wrappers.add(wrapper);
+            }
+
+            // write to the batchlog
+            Collection<InetAddress> batchlogEndpoints = getBatchlogEndpoints(localDataCenter);
+            UUID batchUUID = UUID.randomUUID();
+            syncWriteToBatchlog(mutations, localDataCenter, batchlogEndpoints, batchUUID);
+
+            // now actually perform the writes and wait for them to complete
+            syncWriteBatchedMutations(wrappers, localDataCenter, consistency_level);
+
+            // remove the batchlog entries asynchronously
+            asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID);
+        }
+        catch (UnavailableException e)
+        {
+            writeMetrics.unavailables.mark();
+            ClientRequestMetrics.writeUnavailables.inc();
+            throw e;
+        }
+        catch (WriteTimeoutException e)
+        {
+            writeMetrics.timeouts.mark();
+            ClientRequestMetrics.writeTimeouts.inc();
+            throw e;
+        }
+        finally
+        {
+            writeMetrics.addNano(System.nanoTime() - startTime);
+        }
+    }
+
+    private static void syncWriteToBatchlog(List<RowMutation> mutations,
+                                            String localDataCenter,
+                                            Collection<InetAddress> endpoints,
+                                            UUID uuid)
+    throws WriteTimeoutException
+    {
+        RowMutation rm = BatchlogManager.getBatchlogMutationFor(mutations, uuid);
+        IWriteResponseHandler handler = WriteResponseHandler.create(endpoints, ConsistencyLevel.ONE, Table.SYSTEM_KS, null);
+
+        try
+        {
+            sendMessagesToOneDC(rm.createMessage(), endpoints, true, handler);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("Error writing to batchlog", e);
+        }
+
+        try
+        {
+            handler.get();
+        }
+        catch (WriteTimeoutException e)
+        {
+            throw new WriteTimeoutException(e.consistency, 0, e.blockFor, false);
+        }
+    }
+
+    private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
+    {
+        RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
+        rm.delete(new QueryPath(SystemTable.BATCHLOG_CF), FBUtilities.timestampMicros());
+        IWriteResponseHandler handler = WriteResponseHandler.create(endpoints, ConsistencyLevel.ANY, Table.SYSTEM_KS, null);
+
+        try
+        {
+            sendMessagesToOneDC(rm.createMessage(), endpoints, true, handler);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("Error deleting batch " + uuid, e);
+        }
+    }
+
+    private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers,
+                                                  String localDataCenter,
+                                                  ConsistencyLevel consistencyLevel)
+    throws WriteTimeoutException
+    {
+        for (WriteResponseHandlerWrapper wrapper : wrappers)
+        {
+            try
+            {
+                sendToHintedEndpoints(wrapper.mutation, wrapper.endpoints, wrapper.handler, localDataCenter, consistencyLevel);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException("Error writing key " + ByteBufferUtil.bytesToHex(wrapper.mutation.key()), e);
+            }
+            catch (OverloadedException e)
+            {
+                // turn OE into TOE.
+                throw new WriteTimeoutException(consistencyLevel, -1, 0, true);
+            }
+        }
+
+        for (WriteResponseHandlerWrapper wrapper : wrappers)
+        {
+            try
+            {
+                wrapper.handler.get();
+            }
+            catch (WriteTimeoutException e)
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("Write timeout {} for {}", e, wrapper.mutation.toString(true));
+                throw new WriteTimeoutException(e.consistency, -1, e.blockFor, true);
+            }
+        }
+    }
+
+    /**
      * Perform the write of a mutation given a WritePerformer.
      * Gather the list of write endpoints, apply locally and/or forward the mutation to
      * said write endpoint (deletaged to the actual WritePerformer) and wait for the
@@ -267,7 +410,31 @@ public class StorageProxy implements StorageProxyMBean
         return responseHandler;
     }
 
-    private static Collection<InetAddress> getWriteEndpoints(String table, ByteBuffer key)
+    // same as above except does not initiate writes (but does perfrom availability checks).
+    private static WriteResponseHandlerWrapper wrapResponseHandler(RowMutation mutation, ConsistencyLevel consistency_level)
+    {
+        AbstractReplicationStrategy rs = Table.open(mutation.getTable()).getReplicationStrategy();
+        Collection<InetAddress> writeEndpoints = getWriteEndpoints(mutation.getTable(), mutation.key());
+        IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, consistency_level, null);
+        return new WriteResponseHandlerWrapper(responseHandler, mutation, writeEndpoints);
+    }
+
+    // used by atomic_batch_mutate to decouple availability check from the write itself, caches consistency level and endpoints.
+    private static class WriteResponseHandlerWrapper
+    {
+        final IWriteResponseHandler handler;
+        final RowMutation mutation;
+        final Collection<InetAddress> endpoints;
+
+        WriteResponseHandlerWrapper(IWriteResponseHandler handler, RowMutation mutation, Collection<InetAddress> endpoints)
+        {
+            this.handler = handler;
+            this.mutation = mutation;
+            this.endpoints = endpoints;
+        }
+    }
+
+    public static Collection<InetAddress> getWriteEndpoints(String table, ByteBuffer key)
     {
         StorageService ss = StorageService.instance;
         Token tk = StorageService.getPartitioner().getToken(key);
@@ -275,6 +442,46 @@ public class StorageProxy implements StorageProxyMBean
         return ss.getTokenMetadata().getWriteEndpoints(tk, table, naturalEndpoints);
     }
 
+    /*
+     * Replicas are picked manually:
+     * - replicas should be alive according to the failure detector
+     * - replicas should be in the local datacenter
+     * - choose min(2, number of qualifying candiates above)
+     * - allow the local node to be the only replica only if it's a single-node cluster
+     */
+    private static Collection<InetAddress> getBatchlogEndpoints(String localDataCenter) throws UnavailableException
+    {
+        // will include every known node including localhost.
+        Collection<InetAddress> localMembers = StorageService.instance.getTokenMetadata().getTopology().getDatacenterEndpoints().get(localDataCenter);
+
+        // special case for single-node datacenters
+        if (localMembers.size() == 1)
+            return localMembers;
+
+        // not a single-node cluster - don't count the local node.
+        localMembers.remove(FBUtilities.getBroadcastAddress());
+
+        // include only alive nodes
+        List<InetAddress> candidates = new ArrayList<InetAddress>(localMembers.size());
+        for (InetAddress member : localMembers)
+        {
+            if (FailureDetector.instance.isAlive(member))
+                candidates.add(member);
+        }
+
+        if (candidates.isEmpty())
+            throw new UnavailableException(ConsistencyLevel.ONE, 1, 0);
+
+        if (candidates.size() > 2)
+        {
+            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+            snitch.sortByProximity(FBUtilities.getBroadcastAddress(), candidates);
+            candidates = candidates.subList(0, 2);
+        }
+
+        return candidates;
+    }
+
     /**
      * Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node
      * is not available.
@@ -368,18 +575,7 @@ public class StorageProxy implements StorageProxyMBean
 
                 try
                 {
-                    UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target);
-                    if ((hostId == null) && (MessagingService.instance().getVersion(target) < MessagingService.VERSION_12))
-                    {
-                        logger.warn("Unable to store hint for host with missing ID, {} (old node?)", target.toString());
-                        return;
-                    }
-                    assert hostId != null : "Missing host ID for " + target.getHostAddress();
-                    RowMutation hintedMutation = RowMutation.hintFor(mutation, hostId);
-                    hintedMutation.apply();
-
-                    totalHints.incrementAndGet();
-
+                    writeHintForMutation(mutation, target);
                     // Notify the handler only for CL == ANY
                     if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY)
                         responseHandler.response(null);
@@ -395,6 +591,21 @@ public class StorageProxy implements StorageProxyMBean
         return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
     }
 
+    public static void writeHintForMutation(RowMutation mutation, InetAddress target) throws IOException
+    {
+        UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target);
+        if ((hostId == null) && (MessagingService.instance().getVersion(target) < MessagingService.VERSION_12))
+        {
+            logger.warn("Unable to store hint for host with missing ID, {} (old node?)", target.toString());
+            return;
+        }
+        assert hostId != null : "Missing host ID for " + target.getHostAddress();
+        RowMutation hintedMutation = RowMutation.hintFor(mutation, hostId);
+        hintedMutation.apply();
+
+        totalHints.incrementAndGet();
+    }
+
     /**
      * for each datacenter, send a message to one node to relay the write to other replicas
      */
@@ -403,52 +614,56 @@ public class StorageProxy implements StorageProxyMBean
     {
         for (Map.Entry<String, Multimap<MessageOut, InetAddress>> entry: dcMessages.entrySet())
         {
-            String dataCenter = entry.getKey();
-
-            // send the messages corresponding to this datacenter
+            boolean isLocalDC = entry.getKey().equals(localDataCenter);
             for (Map.Entry<MessageOut, Collection<InetAddress>> messages: entry.getValue().asMap().entrySet())
             {
                 MessageOut message = messages.getKey();
+                Collection<InetAddress> targets = messages.getValue();
                 // a single message object is used for unhinted writes, so clean out any forwards
                 // from previous loop iterations
                 message = message.withHeaderRemoved(RowMutation.FORWARD_TO);
-                Iterator<InetAddress> iter = messages.getValue().iterator();
-                InetAddress target = iter.next();
+                sendMessagesToOneDC(message, targets, isLocalDC, handler);
+            }
+        }
+    }
 
-                // direct writes to local DC or old Cassadra versions
-                if (dataCenter.equals(localDataCenter) || MessagingService.instance().getVersion(target) < MessagingService.VERSION_11)
-                {
-                    // yes, the loop and non-loop code here are the same; this is clunky but we want to avoid
-                    // creating a second iterator since we already have a perfectly good one
-                    MessagingService.instance().sendRR(message, target, handler);
-                    while (iter.hasNext())
-                    {
-                        target = iter.next();
-                        MessagingService.instance().sendRR(message, target, handler);
-                    }
-                    continue;
-                }
+    private static void sendMessagesToOneDC(MessageOut message, Collection<InetAddress> targets, boolean localDC, IWriteResponseHandler handler) throws IOException
+    {
+        Iterator<InetAddress> iter = targets.iterator();
+        InetAddress target = iter.next();
 
-                // Add all the other destinations of the same message as a FORWARD_HEADER entry
-                FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
-                DataOutputStream dos = new DataOutputStream(bos);
-                dos.writeInt(messages.getValue().size() - 1);
-                while (iter.hasNext())
-                {
-                    InetAddress destination = iter.next();
-                    CompactEndpointSerializationHelper.serialize(destination, dos);
-                    String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
-                    dos.writeUTF(id);
-                    if (logger.isDebugEnabled())
-                        logger.debug("Adding FWD message to: " + destination + " with ID " + id);
-                }
-                message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray());
-                // send the combined message + forward headers
-                String id = MessagingService.instance().sendRR(message, target, handler);
-                if (logger.isDebugEnabled())
-                    logger.debug("Sending message to: " + target + " with ID " + id);
+        // direct writes to local DC or old Cassandra versions
+        if (localDC || MessagingService.instance().getVersion(target) < MessagingService.VERSION_11)
+        {
+            // yes, the loop and non-loop code here are the same; this is clunky but we want to avoid
+            // creating a second iterator since we already have a perfectly good one
+            MessagingService.instance().sendRR(message, target, handler);
+            while (iter.hasNext())
+            {
+                target = iter.next();
+                MessagingService.instance().sendRR(message, target, handler);
             }
+            return;
+        }
+
+        // Add all the other destinations of the same message as a FORWARD_HEADER entry
+        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+        dos.writeInt(targets.size() - 1);
+        while (iter.hasNext())
+        {
+            InetAddress destination = iter.next();
+            CompactEndpointSerializationHelper.serialize(destination, dos);
+            String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
+            dos.writeUTF(id);
+            if (logger.isDebugEnabled())
+                logger.debug("Adding FWD message to: " + destination + " with ID " + id);
         }
+        message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray());
+        // send the combined message + forward headers
+        String id = MessagingService.instance().sendRR(message, target, handler);
+        if (logger.isDebugEnabled())
+            logger.debug("Sending message to: " + target + " with ID " + id);
     }
 
     private static void insertLocal(final RowMutation rm, final IWriteResponseHandler responseHandler)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index cc94727..b347aa6 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -515,6 +515,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         Gossiper.instance.addLocalApplicationState(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());
 
         HintedHandOffManager.instance.start();
+        BatchlogManager.instance.start();
 
         // We bootstrap if we haven't successfully bootstrapped before, as long as we are not a seed.
         // If we are a seed, or if the user manually sets auto_bootstrap to false,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 44c6038..40f6681 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -28,7 +28,6 @@ import java.util.zip.DataFormatException;
 import java.util.zip.Inflater;
 
 import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -46,7 +45,6 @@ import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.dht.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
@@ -670,8 +668,10 @@ public class CassandraServer implements Cassandra.Iface
         }
     }
 
-    private void internal_batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
-    throws RequestValidationException, UnavailableException, TimedOutException
+    private List<IMutation> createMutationList(ConsistencyLevel consistency_level,
+                                               Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map,
+                                               boolean allowCounterMutations)
+    throws RequestValidationException
     {
         List<String> cfamsSeen = new ArrayList<String>();
         List<IMutation> rowMutations = new ArrayList<IMutation>();
@@ -731,11 +731,17 @@ public class CassandraServer implements Cassandra.Iface
             }
             if (rmStandard != null && !rmStandard.isEmpty())
                 rowMutations.add(rmStandard);
+
             if (rmCounter != null && !rmCounter.isEmpty())
-                rowMutations.add(new org.apache.cassandra.db.CounterMutation(rmCounter, ThriftConversion.fromThrift(consistency_level)));
+            {
+                if (allowCounterMutations)
+                    rowMutations.add(new CounterMutation(rmCounter, ThriftConversion.fromThrift(consistency_level)));
+                else
+                    throw new org.apache.cassandra.exceptions.InvalidRequestException("Counter mutations are not allowed in atomic batches");
+            }
         }
 
-        doInsert(consistency_level, rowMutations);
+        return rowMutations;
     }
 
     public void batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
@@ -759,7 +765,40 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            internal_batch_mutate(mutation_map, consistency_level);
+            doInsert(consistency_level, createMutationList(consistency_level, mutation_map, true));
+        }
+        catch (RequestValidationException e)
+        {
+            throw ThriftConversion.toThrift(e);
+        }
+        finally
+        {
+            Tracing.instance().stopSession();
+        }
+    }
+
+    public void atomic_batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
+    throws InvalidRequestException, UnavailableException, TimedOutException
+    {
+        if (startSessionIfRequested())
+        {
+            Map<String, String> traceParameters = Maps.newLinkedHashMap();
+            for (Map.Entry<ByteBuffer, Map<String, List<Mutation>>> mutationEntry : mutation_map.entrySet())
+            {
+                traceParameters.put(ByteBufferUtil.bytesToHex(mutationEntry.getKey()),
+                        Joiner.on(";").withKeyValueSeparator(":").join(mutationEntry.getValue()));
+            }
+            traceParameters.put("consistency_level", consistency_level.name());
+            Tracing.instance().begin("atomic_batch_mutate", traceParameters);
+        }
+        else
+        {
+            logger.debug("atomic_batch_mutate");
+        }
+
+        try
+        {
+            doInsert(consistency_level, createMutationList(consistency_level, mutation_map, false), true);
         }
         catch (RequestValidationException e)
         {
@@ -825,6 +864,12 @@ public class CassandraServer implements Cassandra.Iface
     private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations)
     throws UnavailableException, TimedOutException, org.apache.cassandra.exceptions.InvalidRequestException
     {
+        doInsert(consistency_level, mutations, false);
+    }
+
+    private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations, boolean mutateAtomically)
+    throws UnavailableException, TimedOutException, org.apache.cassandra.exceptions.InvalidRequestException
+    {
         org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
         consistencyLevel.validateForWrite(state().getKeyspace());
         if (mutations.isEmpty())
@@ -833,7 +878,10 @@ public class CassandraServer implements Cassandra.Iface
         schedule(DatabaseDescriptor.getWriteRpcTimeout());
         try
         {
-            StorageProxy.mutate(mutations, consistencyLevel);
+            if (mutateAtomically)
+                StorageProxy.mutateAtomically((List<RowMutation>) mutations, consistencyLevel);
+            else
+                StorageProxy.mutate(mutations, consistencyLevel);
         }
         catch (RequestExecutionException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index ac823e9..aa7d236 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -86,7 +86,10 @@ public class ThriftConversion
     {
         TimedOutException toe = new TimedOutException();
         if (e instanceof WriteTimeoutException)
+        {
             toe.setAcknowledged_by(((WriteTimeoutException)e).received);
+            toe.setAcknowledged_by_batchlog(((WriteTimeoutException)e).writtenToBatchlog);
+        }
         return toe;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b38ca287/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index c21e727..ecb387b 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -79,7 +79,7 @@ public class ErrorMessage extends Message.Response
                         ConsistencyLevel cl = Enum.valueOf(ConsistencyLevel.class, CBUtil.readString(body));
                         int received = body.readInt();
                         int blockFor = body.readInt();
-                        te = new WriteTimeoutException(cl, received, blockFor);
+                        te = new WriteTimeoutException(cl, received, blockFor, false);
                     }
                     break;
                 case READ_TIMEOUT: