You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2013/11/15 05:36:31 UTC

svn commit: r1542168 [2/3] - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ hbase-common/src/main/j...

Modified: hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MultiRowMutationProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MultiRowMutationProtos.java?rev=1542168&r1=1542167&r2=1542168&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MultiRowMutationProtos.java (original)
+++ hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MultiRowMutationProtos.java Fri Nov 15 04:36:30 2013
@@ -711,6 +711,26 @@ public final class MultiRowMutationProto
      */
     org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProtoOrBuilder getMutationRequestOrBuilder(
         int index);
+
+    // optional uint64 nonce_group = 2;
+    /**
+     * <code>optional uint64 nonce_group = 2;</code>
+     */
+    boolean hasNonceGroup();
+    /**
+     * <code>optional uint64 nonce_group = 2;</code>
+     */
+    long getNonceGroup();
+
+    // optional uint64 nonce = 3;
+    /**
+     * <code>optional uint64 nonce = 3;</code>
+     */
+    boolean hasNonce();
+    /**
+     * <code>optional uint64 nonce = 3;</code>
+     */
+    long getNonce();
   }
   /**
    * Protobuf type {@code MutateRowsRequest}
@@ -771,6 +791,16 @@ public final class MultiRowMutationProto
               mutationRequest_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.PARSER, extensionRegistry));
               break;
             }
+            case 16: {
+              bitField0_ |= 0x00000001;
+              nonceGroup_ = input.readUInt64();
+              break;
+            }
+            case 24: {
+              bitField0_ |= 0x00000002;
+              nonce_ = input.readUInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -813,6 +843,7 @@ public final class MultiRowMutationProto
       return PARSER;
     }
 
+    private int bitField0_;
     // repeated .MutationProto mutation_request = 1;
     public static final int MUTATION_REQUEST_FIELD_NUMBER = 1;
     private java.util.List<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto> mutationRequest_;
@@ -849,8 +880,42 @@ public final class MultiRowMutationProto
       return mutationRequest_.get(index);
     }
 
+    // optional uint64 nonce_group = 2;
+    public static final int NONCE_GROUP_FIELD_NUMBER = 2;
+    private long nonceGroup_;
+    /**
+     * <code>optional uint64 nonce_group = 2;</code>
+     */
+    public boolean hasNonceGroup() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional uint64 nonce_group = 2;</code>
+     */
+    public long getNonceGroup() {
+      return nonceGroup_;
+    }
+
+    // optional uint64 nonce = 3;
+    public static final int NONCE_FIELD_NUMBER = 3;
+    private long nonce_;
+    /**
+     * <code>optional uint64 nonce = 3;</code>
+     */
+    public boolean hasNonce() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    /**
+     * <code>optional uint64 nonce = 3;</code>
+     */
+    public long getNonce() {
+      return nonce_;
+    }
+
     private void initFields() {
       mutationRequest_ = java.util.Collections.emptyList();
+      nonceGroup_ = 0L;
+      nonce_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -873,6 +938,12 @@ public final class MultiRowMutationProto
       for (int i = 0; i < mutationRequest_.size(); i++) {
         output.writeMessage(1, mutationRequest_.get(i));
       }
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeUInt64(2, nonceGroup_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeUInt64(3, nonce_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -886,6 +957,14 @@ public final class MultiRowMutationProto
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(1, mutationRequest_.get(i));
       }
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(2, nonceGroup_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(3, nonce_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -911,6 +990,16 @@ public final class MultiRowMutationProto
       boolean result = true;
       result = result && getMutationRequestList()
           .equals(other.getMutationRequestList());
+      result = result && (hasNonceGroup() == other.hasNonceGroup());
+      if (hasNonceGroup()) {
+        result = result && (getNonceGroup()
+            == other.getNonceGroup());
+      }
+      result = result && (hasNonce() == other.hasNonce());
+      if (hasNonce()) {
+        result = result && (getNonce()
+            == other.getNonce());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -928,6 +1017,14 @@ public final class MultiRowMutationProto
         hash = (37 * hash) + MUTATION_REQUEST_FIELD_NUMBER;
         hash = (53 * hash) + getMutationRequestList().hashCode();
       }
+      if (hasNonceGroup()) {
+        hash = (37 * hash) + NONCE_GROUP_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getNonceGroup());
+      }
+      if (hasNonce()) {
+        hash = (37 * hash) + NONCE_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getNonce());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -1044,6 +1141,10 @@ public final class MultiRowMutationProto
         } else {
           mutationRequestBuilder_.clear();
         }
+        nonceGroup_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        nonce_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000004);
         return this;
       }
 
@@ -1071,6 +1172,7 @@ public final class MultiRowMutationProto
       public org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest buildPartial() {
         org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest result = new org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest(this);
         int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
         if (mutationRequestBuilder_ == null) {
           if (((bitField0_ & 0x00000001) == 0x00000001)) {
             mutationRequest_ = java.util.Collections.unmodifiableList(mutationRequest_);
@@ -1080,6 +1182,15 @@ public final class MultiRowMutationProto
         } else {
           result.mutationRequest_ = mutationRequestBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.nonceGroup_ = nonceGroup_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.nonce_ = nonce_;
+        result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
       }
@@ -1121,6 +1232,12 @@ public final class MultiRowMutationProto
             }
           }
         }
+        if (other.hasNonceGroup()) {
+          setNonceGroup(other.getNonceGroup());
+        }
+        if (other.hasNonce()) {
+          setNonce(other.getNonce());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1394,6 +1511,72 @@ public final class MultiRowMutationProto
         return mutationRequestBuilder_;
       }
 
+      // optional uint64 nonce_group = 2;
+      private long nonceGroup_ ;
+      /**
+       * <code>optional uint64 nonce_group = 2;</code>
+       */
+      public boolean hasNonceGroup() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * <code>optional uint64 nonce_group = 2;</code>
+       */
+      public long getNonceGroup() {
+        return nonceGroup_;
+      }
+      /**
+       * <code>optional uint64 nonce_group = 2;</code>
+       */
+      public Builder setNonceGroup(long value) {
+        bitField0_ |= 0x00000002;
+        nonceGroup_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional uint64 nonce_group = 2;</code>
+       */
+      public Builder clearNonceGroup() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        nonceGroup_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional uint64 nonce = 3;
+      private long nonce_ ;
+      /**
+       * <code>optional uint64 nonce = 3;</code>
+       */
+      public boolean hasNonce() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * <code>optional uint64 nonce = 3;</code>
+       */
+      public long getNonce() {
+        return nonce_;
+      }
+      /**
+       * <code>optional uint64 nonce = 3;</code>
+       */
+      public Builder setNonce(long value) {
+        bitField0_ |= 0x00000004;
+        nonce_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional uint64 nonce = 3;</code>
+       */
+      public Builder clearNonce() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        nonce_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:MutateRowsRequest)
     }
 
@@ -2006,13 +2189,14 @@ public final class MultiRowMutationProto
     java.lang.String[] descriptorData = {
       "\n\026MultiRowMutation.proto\032\014Client.proto\"\"" +
       "\n MultiRowMutationProcessorRequest\"#\n!Mu" +
-      "ltiRowMutationProcessorResponse\"=\n\021Mutat" +
+      "ltiRowMutationProcessorResponse\"a\n\021Mutat" +
       "eRowsRequest\022(\n\020mutation_request\030\001 \003(\0132\016" +
-      ".MutationProto\"\024\n\022MutateRowsResponse2P\n\027" +
-      "MultiRowMutationService\0225\n\nMutateRows\022\022." +
-      "MutateRowsRequest\032\023.MutateRowsResponseBL" +
-      "\n*org.apache.hadoop.hbase.protobuf.gener" +
-      "atedB\026MultiRowMutationProtosH\001\210\001\001\240\001\001"
+      ".MutationProto\022\023\n\013nonce_group\030\002 \001(\004\022\r\n\005n" +
+      "once\030\003 \001(\004\"\024\n\022MutateRowsResponse2P\n\027Mult" +
+      "iRowMutationService\0225\n\nMutateRows\022\022.Muta" +
+      "teRowsRequest\032\023.MutateRowsResponseBL\n*or" +
+      "g.apache.hadoop.hbase.protobuf.generated" +
+      "B\026MultiRowMutationProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -2036,7 +2220,7 @@ public final class MultiRowMutationProto
           internal_static_MutateRowsRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_MutateRowsRequest_descriptor,
-              new java.lang.String[] { "MutationRequest", });
+              new java.lang.String[] { "MutationRequest", "NonceGroup", "Nonce", });
           internal_static_MutateRowsResponse_descriptor =
             getDescriptor().getMessageTypes().get(3);
           internal_static_MutateRowsResponse_fieldAccessorTable = new

Modified: hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RowProcessorProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RowProcessorProtos.java?rev=1542168&r1=1542167&r2=1542168&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RowProcessorProtos.java (original)
+++ hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RowProcessorProtos.java Fri Nov 15 04:36:30 2013
@@ -50,6 +50,26 @@ public final class RowProcessorProtos {
      * <code>optional bytes row_processor_initializer_message = 3;</code>
      */
     com.google.protobuf.ByteString getRowProcessorInitializerMessage();
+
+    // optional uint64 nonce_group = 4;
+    /**
+     * <code>optional uint64 nonce_group = 4;</code>
+     */
+    boolean hasNonceGroup();
+    /**
+     * <code>optional uint64 nonce_group = 4;</code>
+     */
+    long getNonceGroup();
+
+    // optional uint64 nonce = 5;
+    /**
+     * <code>optional uint64 nonce = 5;</code>
+     */
+    boolean hasNonce();
+    /**
+     * <code>optional uint64 nonce = 5;</code>
+     */
+    long getNonce();
   }
   /**
    * Protobuf type {@code ProcessRequest}
@@ -117,6 +137,16 @@ public final class RowProcessorProtos {
               rowProcessorInitializerMessage_ = input.readBytes();
               break;
             }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              nonceGroup_ = input.readUInt64();
+              break;
+            }
+            case 40: {
+              bitField0_ |= 0x00000010;
+              nonce_ = input.readUInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -259,10 +289,44 @@ public final class RowProcessorProtos {
       return rowProcessorInitializerMessage_;
     }
 
+    // optional uint64 nonce_group = 4;
+    public static final int NONCE_GROUP_FIELD_NUMBER = 4;
+    private long nonceGroup_;
+    /**
+     * <code>optional uint64 nonce_group = 4;</code>
+     */
+    public boolean hasNonceGroup() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    /**
+     * <code>optional uint64 nonce_group = 4;</code>
+     */
+    public long getNonceGroup() {
+      return nonceGroup_;
+    }
+
+    // optional uint64 nonce = 5;
+    public static final int NONCE_FIELD_NUMBER = 5;
+    private long nonce_;
+    /**
+     * <code>optional uint64 nonce = 5;</code>
+     */
+    public boolean hasNonce() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>optional uint64 nonce = 5;</code>
+     */
+    public long getNonce() {
+      return nonce_;
+    }
+
     private void initFields() {
       rowProcessorClassName_ = "";
       rowProcessorInitializerMessageName_ = "";
       rowProcessorInitializerMessage_ = com.google.protobuf.ByteString.EMPTY;
+      nonceGroup_ = 0L;
+      nonce_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -289,6 +353,12 @@ public final class RowProcessorProtos {
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeBytes(3, rowProcessorInitializerMessage_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeUInt64(4, nonceGroup_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeUInt64(5, nonce_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -310,6 +380,14 @@ public final class RowProcessorProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(3, rowProcessorInitializerMessage_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(4, nonceGroup_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(5, nonce_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -348,6 +426,16 @@ public final class RowProcessorProtos {
         result = result && getRowProcessorInitializerMessage()
             .equals(other.getRowProcessorInitializerMessage());
       }
+      result = result && (hasNonceGroup() == other.hasNonceGroup());
+      if (hasNonceGroup()) {
+        result = result && (getNonceGroup()
+            == other.getNonceGroup());
+      }
+      result = result && (hasNonce() == other.hasNonce());
+      if (hasNonce()) {
+        result = result && (getNonce()
+            == other.getNonce());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -373,6 +461,14 @@ public final class RowProcessorProtos {
         hash = (37 * hash) + ROW_PROCESSOR_INITIALIZER_MESSAGE_FIELD_NUMBER;
         hash = (53 * hash) + getRowProcessorInitializerMessage().hashCode();
       }
+      if (hasNonceGroup()) {
+        hash = (37 * hash) + NONCE_GROUP_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getNonceGroup());
+      }
+      if (hasNonce()) {
+        hash = (37 * hash) + NONCE_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getNonce());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -488,6 +584,10 @@ public final class RowProcessorProtos {
         bitField0_ = (bitField0_ & ~0x00000002);
         rowProcessorInitializerMessage_ = com.google.protobuf.ByteString.EMPTY;
         bitField0_ = (bitField0_ & ~0x00000004);
+        nonceGroup_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000008);
+        nonce_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000010);
         return this;
       }
 
@@ -528,6 +628,14 @@ public final class RowProcessorProtos {
           to_bitField0_ |= 0x00000004;
         }
         result.rowProcessorInitializerMessage_ = rowProcessorInitializerMessage_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.nonceGroup_ = nonceGroup_;
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.nonce_ = nonce_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -557,6 +665,12 @@ public final class RowProcessorProtos {
         if (other.hasRowProcessorInitializerMessage()) {
           setRowProcessorInitializerMessage(other.getRowProcessorInitializerMessage());
         }
+        if (other.hasNonceGroup()) {
+          setNonceGroup(other.getNonceGroup());
+        }
+        if (other.hasNonce()) {
+          setNonce(other.getNonce());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -772,6 +886,72 @@ public final class RowProcessorProtos {
         return this;
       }
 
+      // optional uint64 nonce_group = 4;
+      private long nonceGroup_ ;
+      /**
+       * <code>optional uint64 nonce_group = 4;</code>
+       */
+      public boolean hasNonceGroup() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      /**
+       * <code>optional uint64 nonce_group = 4;</code>
+       */
+      public long getNonceGroup() {
+        return nonceGroup_;
+      }
+      /**
+       * <code>optional uint64 nonce_group = 4;</code>
+       */
+      public Builder setNonceGroup(long value) {
+        bitField0_ |= 0x00000008;
+        nonceGroup_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional uint64 nonce_group = 4;</code>
+       */
+      public Builder clearNonceGroup() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        nonceGroup_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional uint64 nonce = 5;
+      private long nonce_ ;
+      /**
+       * <code>optional uint64 nonce = 5;</code>
+       */
+      public boolean hasNonce() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      /**
+       * <code>optional uint64 nonce = 5;</code>
+       */
+      public long getNonce() {
+        return nonce_;
+      }
+      /**
+       * <code>optional uint64 nonce = 5;</code>
+       */
+      public Builder setNonce(long value) {
+        bitField0_ |= 0x00000010;
+        nonce_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional uint64 nonce = 5;</code>
+       */
+      public Builder clearNonce() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        nonce_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:ProcessRequest)
     }
 
@@ -1479,15 +1659,16 @@ public final class RowProcessorProtos {
       descriptor;
   static {
     java.lang.String[] descriptorData = {
-      "\n\022RowProcessor.proto\"\215\001\n\016ProcessRequest\022" +
+      "\n\022RowProcessor.proto\"\261\001\n\016ProcessRequest\022" +
       " \n\030row_processor_class_name\030\001 \002(\t\022.\n&row" +
       "_processor_initializer_message_name\030\002 \001(" +
       "\t\022)\n!row_processor_initializer_message\030\003" +
-      " \001(\014\"/\n\017ProcessResponse\022\034\n\024row_processor" +
-      "_result\030\001 \002(\0142C\n\023RowProcessorService\022,\n\007" +
-      "Process\022\017.ProcessRequest\032\020.ProcessRespon" +
-      "seBH\n*org.apache.hadoop.hbase.protobuf.g" +
-      "eneratedB\022RowProcessorProtosH\001\210\001\001\240\001\001"
+      " \001(\014\022\023\n\013nonce_group\030\004 \001(\004\022\r\n\005nonce\030\005 \001(\004" +
+      "\"/\n\017ProcessResponse\022\034\n\024row_processor_res" +
+      "ult\030\001 \002(\0142C\n\023RowProcessorService\022,\n\007Proc" +
+      "ess\022\017.ProcessRequest\032\020.ProcessResponseBH" +
+      "\n*org.apache.hadoop.hbase.protobuf.gener" +
+      "atedB\022RowProcessorProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -1499,7 +1680,7 @@ public final class RowProcessorProtos {
           internal_static_ProcessRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_ProcessRequest_descriptor,
-              new java.lang.String[] { "RowProcessorClassName", "RowProcessorInitializerMessageName", "RowProcessorInitializerMessage", });
+              new java.lang.String[] { "RowProcessorClassName", "RowProcessorInitializerMessageName", "RowProcessorInitializerMessage", "NonceGroup", "Nonce", });
           internal_static_ProcessResponse_descriptor =
             getDescriptor().getMessageTypes().get(1);
           internal_static_ProcessResponse_fieldAccessorTable = new

Modified: hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java?rev=1542168&r1=1542167&r2=1542168&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java (original)
+++ hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java Fri Nov 15 04:36:30 2013
@@ -694,6 +694,26 @@ public final class WALProtos {
      */
     org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdsOrBuilder(
         int index);
+
+    // optional uint64 nonceGroup = 9;
+    /**
+     * <code>optional uint64 nonceGroup = 9;</code>
+     */
+    boolean hasNonceGroup();
+    /**
+     * <code>optional uint64 nonceGroup = 9;</code>
+     */
+    long getNonceGroup();
+
+    // optional uint64 nonce = 10;
+    /**
+     * <code>optional uint64 nonce = 10;</code>
+     */
+    boolean hasNonce();
+    /**
+     * <code>optional uint64 nonce = 10;</code>
+     */
+    long getNonce();
   }
   /**
    * Protobuf type {@code WALKey}
@@ -804,6 +824,16 @@ public final class WALProtos {
               clusterIds_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.PARSER, extensionRegistry));
               break;
             }
+            case 72: {
+              bitField0_ |= 0x00000040;
+              nonceGroup_ = input.readUInt64();
+              break;
+            }
+            case 80: {
+              bitField0_ |= 0x00000080;
+              nonce_ = input.readUInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -1078,6 +1108,38 @@ public final class WALProtos {
       return clusterIds_.get(index);
     }
 
+    // optional uint64 nonceGroup = 9;
+    public static final int NONCEGROUP_FIELD_NUMBER = 9;
+    private long nonceGroup_;
+    /**
+     * <code>optional uint64 nonceGroup = 9;</code>
+     */
+    public boolean hasNonceGroup() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    /**
+     * <code>optional uint64 nonceGroup = 9;</code>
+     */
+    public long getNonceGroup() {
+      return nonceGroup_;
+    }
+
+    // optional uint64 nonce = 10;
+    public static final int NONCE_FIELD_NUMBER = 10;
+    private long nonce_;
+    /**
+     * <code>optional uint64 nonce = 10;</code>
+     */
+    public boolean hasNonce() {
+      return ((bitField0_ & 0x00000080) == 0x00000080);
+    }
+    /**
+     * <code>optional uint64 nonce = 10;</code>
+     */
+    public long getNonce() {
+      return nonce_;
+    }
+
     private void initFields() {
       encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
       tableName_ = com.google.protobuf.ByteString.EMPTY;
@@ -1087,6 +1149,8 @@ public final class WALProtos {
       scopes_ = java.util.Collections.emptyList();
       followingKvCount_ = 0;
       clusterIds_ = java.util.Collections.emptyList();
+      nonceGroup_ = 0L;
+      nonce_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -1158,6 +1222,12 @@ public final class WALProtos {
       for (int i = 0; i < clusterIds_.size(); i++) {
         output.writeMessage(8, clusterIds_.get(i));
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeUInt64(9, nonceGroup_);
+      }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        output.writeUInt64(10, nonce_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1199,6 +1269,14 @@ public final class WALProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(8, clusterIds_.get(i));
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(9, nonceGroup_);
+      }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(10, nonce_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1256,6 +1334,16 @@ public final class WALProtos {
       }
       result = result && getClusterIdsList()
           .equals(other.getClusterIdsList());
+      result = result && (hasNonceGroup() == other.hasNonceGroup());
+      if (hasNonceGroup()) {
+        result = result && (getNonceGroup()
+            == other.getNonceGroup());
+      }
+      result = result && (hasNonce() == other.hasNonce());
+      if (hasNonce()) {
+        result = result && (getNonce()
+            == other.getNonce());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -1301,6 +1389,14 @@ public final class WALProtos {
         hash = (37 * hash) + CLUSTER_IDS_FIELD_NUMBER;
         hash = (53 * hash) + getClusterIdsList().hashCode();
       }
+      if (hasNonceGroup()) {
+        hash = (37 * hash) + NONCEGROUP_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getNonceGroup());
+      }
+      if (hasNonce()) {
+        hash = (37 * hash) + NONCE_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getNonce());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -1445,6 +1541,10 @@ public final class WALProtos {
         } else {
           clusterIdsBuilder_.clear();
         }
+        nonceGroup_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000100);
+        nonce_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000200);
         return this;
       }
 
@@ -1519,6 +1619,14 @@ public final class WALProtos {
         } else {
           result.clusterIds_ = clusterIdsBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000100) == 0x00000100)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        result.nonceGroup_ = nonceGroup_;
+        if (((from_bitField0_ & 0x00000200) == 0x00000200)) {
+          to_bitField0_ |= 0x00000080;
+        }
+        result.nonce_ = nonce_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1605,6 +1713,12 @@ public final class WALProtos {
             }
           }
         }
+        if (other.hasNonceGroup()) {
+          setNonceGroup(other.getNonceGroup());
+        }
+        if (other.hasNonce()) {
+          setNonce(other.getNonce());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -2614,6 +2728,72 @@ public final class WALProtos {
         return clusterIdsBuilder_;
       }
 
+      // optional uint64 nonceGroup = 9;
+      private long nonceGroup_ ;
+      /**
+       * <code>optional uint64 nonceGroup = 9;</code>
+       */
+      public boolean hasNonceGroup() {
+        return ((bitField0_ & 0x00000100) == 0x00000100);
+      }
+      /**
+       * <code>optional uint64 nonceGroup = 9;</code>
+       */
+      public long getNonceGroup() {
+        return nonceGroup_;
+      }
+      /**
+       * <code>optional uint64 nonceGroup = 9;</code>
+       */
+      public Builder setNonceGroup(long value) {
+        bitField0_ |= 0x00000100;
+        nonceGroup_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional uint64 nonceGroup = 9;</code>
+       */
+      public Builder clearNonceGroup() {
+        bitField0_ = (bitField0_ & ~0x00000100);
+        nonceGroup_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional uint64 nonce = 10;
+      private long nonce_ ;
+      /**
+       * <code>optional uint64 nonce = 10;</code>
+       */
+      public boolean hasNonce() {
+        return ((bitField0_ & 0x00000200) == 0x00000200);
+      }
+      /**
+       * <code>optional uint64 nonce = 10;</code>
+       */
+      public long getNonce() {
+        return nonce_;
+      }
+      /**
+       * <code>optional uint64 nonce = 10;</code>
+       */
+      public Builder setNonce(long value) {
+        bitField0_ |= 0x00000200;
+        nonce_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional uint64 nonce = 10;</code>
+       */
+      public Builder clearNonce() {
+        bitField0_ = (bitField0_ & ~0x00000200);
+        nonce_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:WALKey)
     }
 
@@ -4812,22 +4992,23 @@ public final class WALProtos {
   static {
     java.lang.String[] descriptorData = {
       "\n\tWAL.proto\032\013HBase.proto\"$\n\tWALHeader\022\027\n" +
-      "\017has_compression\030\001 \001(\010\"\337\001\n\006WALKey\022\033\n\023enc" +
+      "\017has_compression\030\001 \001(\010\"\202\002\n\006WALKey\022\033\n\023enc" +
       "oded_region_name\030\001 \002(\014\022\022\n\ntable_name\030\002 \002" +
       "(\014\022\033\n\023log_sequence_number\030\003 \002(\004\022\022\n\nwrite" +
       "_time\030\004 \002(\004\022\035\n\ncluster_id\030\005 \001(\0132\005.UUIDB\002" +
       "\030\001\022\034\n\006scopes\030\006 \003(\0132\014.FamilyScope\022\032\n\022foll" +
       "owing_kv_count\030\007 \001(\r\022\032\n\013cluster_ids\030\010 \003(" +
-      "\0132\005.UUID\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022" +
-      "\036\n\nscope_type\030\002 \002(\0162\n.ScopeType\"\251\001\n\024Comp" +
-      "actionDescriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023",
-      "encoded_region_name\030\002 \002(\014\022\023\n\013family_name" +
-      "\030\003 \002(\014\022\030\n\020compaction_input\030\004 \003(\t\022\031\n\021comp" +
-      "action_output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 " +
-      "\002(\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLIC" +
-      "ATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE" +
-      "_GLOBAL\020\001B?\n*org.apache.hadoop.hbase.pro" +
-      "tobuf.generatedB\tWALProtosH\001\210\001\000\240\001\001"
+      "\0132\005.UUID\022\022\n\nnonceGroup\030\t \001(\004\022\r\n\005nonce\030\n " +
+      "\001(\004\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022\036\n\nsc" +
+      "ope_type\030\002 \002(\0162\n.ScopeType\"\251\001\n\024Compactio",
+      "nDescriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023encod" +
+      "ed_region_name\030\002 \002(\014\022\023\n\013family_name\030\003 \002(" +
+      "\014\022\030\n\020compaction_input\030\004 \003(\t\022\031\n\021compactio" +
+      "n_output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(\t\"\014" +
+      "\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLICATION" +
+      "_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOB" +
+      "AL\020\001B?\n*org.apache.hadoop.hbase.protobuf" +
+      ".generatedB\tWALProtosH\001\210\001\000\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -4845,7 +5026,7 @@ public final class WALProtos {
           internal_static_WALKey_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_WALKey_descriptor,
-              new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", "ClusterIds", });
+              new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", "ClusterIds", "NonceGroup", "Nonce", });
           internal_static_FamilyScope_descriptor =
             getDescriptor().getMessageTypes().get(2);
           internal_static_FamilyScope_fieldAccessorTable = new

Modified: hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto?rev=1542168&r1=1542167&r2=1542168&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto Fri Nov 15 04:36:30 2013
@@ -133,6 +133,8 @@ message MutationProto {
   // 'cell' field above which is non-null when the cells are pb'd.
   optional int32 associated_cell_count = 8;
 
+  optional uint64 nonce = 9;
+
   enum Durability {
     USE_DEFAULT  = 0;
     SKIP_WAL     = 1;
@@ -182,6 +184,7 @@ message MutateRequest {
   required RegionSpecifier region = 1;
   required MutationProto mutation = 2;
   optional Condition condition = 3;
+  optional uint64 nonce_group = 4;
 }
 
 message MutateResponse {
@@ -345,6 +348,7 @@ message RegionActionResult {
  */
 message MultiRequest {
   repeated RegionAction regionAction = 1;
+  optional uint64 nonceGroup = 2;
 }
 
 message MultiResponse {

Modified: hbase/trunk/hbase-protocol/src/main/protobuf/MultiRowMutation.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/MultiRowMutation.proto?rev=1542168&r1=1542167&r2=1542168&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/MultiRowMutation.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/MultiRowMutation.proto Fri Nov 15 04:36:30 2013
@@ -30,6 +30,8 @@ message MultiRowMutationProcessorRespons
 
 message MutateRowsRequest {
   repeated MutationProto mutation_request = 1;
+  optional uint64 nonce_group = 2;
+  optional uint64 nonce = 3;
 }
 
 message MutateRowsResponse {

Modified: hbase/trunk/hbase-protocol/src/main/protobuf/RowProcessor.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/RowProcessor.proto?rev=1542168&r1=1542167&r2=1542168&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/RowProcessor.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/RowProcessor.proto Fri Nov 15 04:36:30 2013
@@ -30,6 +30,8 @@ message ProcessRequest {
   required string row_processor_class_name = 1;
   optional string row_processor_initializer_message_name = 2;
   optional bytes  row_processor_initializer_message = 3;
+  optional uint64 nonce_group = 4;
+  optional uint64 nonce = 5;
 }
 
 message ProcessResponse {

Modified: hbase/trunk/hbase-protocol/src/main/protobuf/WAL.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/WAL.proto?rev=1542168&r1=1542167&r2=1542168&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/WAL.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/WAL.proto Fri Nov 15 04:36:30 2013
@@ -43,14 +43,19 @@ message WALKey {
 
   repeated FamilyScope scopes = 6;
   optional uint32 following_kv_count = 7;
+
   /*
   This field contains the list of clusters that have
   consumed the change
   */
   repeated UUID cluster_ids = 8;
+
+  optional uint64 nonceGroup = 9;
+  optional uint64 nonce = 10;
+
 /*
   optional CustomEntryType custom_entry_type = 9;
-  
+
   enum CustomEntryType {
     COMPACTION = 0;
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java?rev=1542168&r1=1542167&r2=1542168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java Fri Nov 15 04:36:30 2013
@@ -380,4 +380,9 @@ public class CoprocessorHConnection impl
   public boolean isDeadServer(ServerName serverName) {
     return delegate.isDeadServer(serverName);
   }
+
+  @Override
+  public NonceGenerator getNonceGenerator() {
+    return null; // don't use nonces for coprocessor connection
+  }
 }
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java?rev=1542168&r1=1542167&r2=1542168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java Fri Nov 15 04:36:30 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessResponse;
@@ -68,7 +69,9 @@ extends RowProcessorService implements C
     try {
       RowProcessor<S,T> processor = constructRowProcessorFromRequest(request);
       HRegion region = env.getRegion();
-      region.processRowsWithLocks(processor);
+      long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
+      long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE;
+      region.processRowsWithLocks(processor, nonceGroup, nonce);
       T result = processor.getResult();
       ProcessResponse.Builder b = ProcessResponse.newBuilder();
       b.setRowProcessorResult(result.toByteString());

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java?rev=1542168&r1=1542167&r2=1542168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java Fri Nov 15 04:36:30 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -80,7 +81,7 @@ public class MultiRowMutationEndpoint ex
 CoprocessorService, Coprocessor {
   private RegionCoprocessorEnvironment env;
   @Override
-  public void mutateRows(RpcController controller, MutateRowsRequest request, 
+  public void mutateRows(RpcController controller, MutateRowsRequest request,
       RpcCallback<MutateRowsResponse> done) {
     MutateRowsResponse response = MutateRowsResponse.getDefaultInstance();
     try {
@@ -110,7 +111,9 @@ CoprocessorService, Coprocessor {
         rowsToLock.add(m.getRow());
       }
       // call utility method on region
-      env.getRegion().mutateRowsWithLocks(mutations, rowsToLock);
+      long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
+      long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE;
+      env.getRegion().mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce);
     } catch (IOException e) {
       ResponseConverter.setControllerException(controller, e);
     }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java?rev=1542168&r1=1542167&r2=1542168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java Fri Nov 15 04:36:30 2013
@@ -31,6 +31,7 @@ import java.util.UUID;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.SizedCellScanner;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
@@ -86,6 +87,7 @@ public class ReplicationProtbufUtil {
     HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
     for (HLog.Entry entry: entries) {
       entryBuilder.clear();
+      // TODO: this duplicates a lot in HLogKey#getBuilder
       WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
       HLogKey key = entry.getKey();
       keyBuilder.setEncodedRegionName(
@@ -93,6 +95,12 @@ public class ReplicationProtbufUtil {
       keyBuilder.setTableName(ZeroCopyLiteralByteString.wrap(key.getTablename().getName()));
       keyBuilder.setLogSequenceNumber(key.getLogSeqNum());
       keyBuilder.setWriteTime(key.getWriteTime());
+      if (key.getNonce() != HConstants.NO_NONCE) {
+        keyBuilder.setNonce(key.getNonce());
+      }
+      if (key.getNonceGroup() != HConstants.NO_NONCE) {
+        keyBuilder.setNonceGroup(key.getNonceGroup());
+      }
       for(UUID clusterId : key.getClusterIds()) {
         uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
         uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1542168&r1=1542167&r2=1542168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Nov 15 04:36:30 2013
@@ -68,6 +68,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CompoundConfiguration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -122,6 +123,8 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.MutationReplay;
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
@@ -1897,7 +1900,7 @@ public class HRegion implements HeapSize
    * accumulating status codes and tracking the index at which processing
    * is proceeding.
    */
-  private static class BatchOperationInProgress<T> {
+  private abstract static class BatchOperationInProgress<T> {
     T[] operations;
     int nextIndexToProcess = 0;
     OperationStatus[] retCodeDetails;
@@ -1910,11 +1913,84 @@ public class HRegion implements HeapSize
       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
     }
 
+    public abstract Mutation getMutation(int index);
+    public abstract long getNonceGroup(int index);
+    public abstract long getNonce(int index);
+    /** This method is potentially expensive and should only be used for non-replay CP path. */
+    public abstract Mutation[] getMutationsForCoprocs();
+    public abstract boolean isInReplay();
+
     public boolean isDone() {
       return nextIndexToProcess == operations.length;
     }
   }
 
+  private static class MutationBatch extends BatchOperationInProgress<Mutation> {
+    private long nonceGroup;
+    private long nonce;
+    public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
+      super(operations);
+      this.nonceGroup = nonceGroup;
+      this.nonce = nonce;
+    }
+
+    public Mutation getMutation(int index) {
+      return this.operations[index];
+    }
+
+    @Override
+    public long getNonceGroup(int index) {
+      return nonceGroup;
+    }
+
+    @Override
+    public long getNonce(int index) {
+      return nonce;
+    }
+
+    @Override
+    public Mutation[] getMutationsForCoprocs() {
+      return this.operations;
+    }
+
+    @Override
+    public boolean isInReplay() {
+      return false;
+    }
+  }
+
+  private static class ReplayBatch extends BatchOperationInProgress<HLogSplitter.MutationReplay> {
+    public ReplayBatch(MutationReplay[] operations) {
+      super(operations);
+    }
+
+    @Override
+    public Mutation getMutation(int index) {
+      return this.operations[index].mutation;
+    }
+
+    @Override
+    public long getNonceGroup(int index) {
+      return this.operations[index].nonceGroup;
+    }
+
+    @Override
+    public long getNonce(int index) {
+      return this.operations[index].nonce;
+    }
+
+    @Override
+    public Mutation[] getMutationsForCoprocs() {
+      assert false;
+      throw new RuntimeException("Should not be called for replay batch");
+    }
+
+    @Override
+    public boolean isInReplay() {
+      return true;
+    }
+  }
+
   /**
    * Perform a batch of mutations.
    * It supports only Put and Delete mutations and will ignore other types passed.
@@ -1923,8 +1999,28 @@ public class HRegion implements HeapSize
    *         OperationStatusCode and the exceptionMessage if any.
    * @throws IOException
    */
+  public OperationStatus[] batchMutate(
+      Mutation[] mutations, long nonceGroup, long nonce) throws IOException {
+    // As it stands, this is used for 3 things
+    //  * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
+    //  * coprocessor calls (see ex. BulkDeleteEndpoint).
+    // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
+    return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
+  }
+
   public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
-    return batchMutate(mutations, false);
+    return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
+  }
+
+  /**
+   * Replay a batch of mutations.
+   * @param mutations mutations to replay.
+   * @return
+   * @throws IOException
+   */
+  public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations)
+      throws IOException {
+    return batchMutate(new ReplayBatch(mutations));
   }
 
   /**
@@ -1935,21 +2031,16 @@ public class HRegion implements HeapSize
    *         OperationStatusCode and the exceptionMessage if any.
    * @throws IOException
    */
-  OperationStatus[] batchMutate(Mutation[] mutations, boolean isReplay)
-      throws IOException {
-    BatchOperationInProgress<Mutation> batchOp =
-      new BatchOperationInProgress<Mutation>(mutations);
-
+  OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
     boolean initialized = false;
-
     while (!batchOp.isDone()) {
-      if (!isReplay) {
+      if (!batchOp.isInReplay()) {
         checkReadOnly();
       }
       checkResources();
 
       long newSize;
-      if (isReplay) {
+      if (batchOp.isInReplay()) {
         startRegionOperation(Operation.REPLAY_BATCH_MUTATE);
       } else {
         startRegionOperation(Operation.BATCH_MUTATE);
@@ -1957,13 +2048,13 @@ public class HRegion implements HeapSize
 
       try {
         if (!initialized) {
-          if (!isReplay) {
+          if (!batchOp.isInReplay()) {
             this.writeRequestsCount.increment();
             doPreMutationHook(batchOp);
           }
           initialized = true;
         }
-        long addedSize = doMiniBatchMutation(batchOp, isReplay);
+        long addedSize = doMiniBatchMutation(batchOp);
         newSize = this.addAndGetGlobalMemstoreSize(addedSize);
       } finally {
         closeRegionOperation();
@@ -1976,13 +2067,13 @@ public class HRegion implements HeapSize
   }
 
 
-  private void doPreMutationHook(BatchOperationInProgress<Mutation> batchOp)
+  private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
       throws IOException {
     /* Run coprocessor pre hook outside of locks to avoid deadlock */
     WALEdit walEdit = new WALEdit();
     if (coprocessorHost != null) {
       for (int i = 0 ; i < batchOp.operations.length; i++) {
-        Mutation m = batchOp.operations[i];
+        Mutation m = batchOp.getMutation(i);
         if (m instanceof Put) {
           if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
             // pre hook says skip this Put
@@ -2011,9 +2102,8 @@ public class HRegion implements HeapSize
   }
 
   @SuppressWarnings("unchecked")
-  private long doMiniBatchMutation(BatchOperationInProgress<Mutation> batchOp,
-      boolean isInReplay) throws IOException {
-
+  private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
+    boolean isInReplay = batchOp.isInReplay();
     // variable to note if all Put items are for the same CF -- metrics related
     boolean putsCfSetConsistent = true;
     //The set of columnFamilies first seen for Put.
@@ -2023,6 +2113,7 @@ public class HRegion implements HeapSize
     //The set of columnFamilies first seen for Delete.
     Set<byte[]> deletesCfSet = null;
 
+    long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
     WALEdit walEdit = new WALEdit(isInReplay);
     MultiVersionConsistencyControl.WriteEntry w = null;
     long txid = 0;
@@ -2046,7 +2137,7 @@ public class HRegion implements HeapSize
       int numReadyToWrite = 0;
       long now = EnvironmentEdgeManager.currentTimeMillis();
       while (lastIndexExclusive < batchOp.operations.length) {
-        Mutation mutation = batchOp.operations[lastIndexExclusive];
+        Mutation mutation = batchOp.getMutation(lastIndexExclusive);
         boolean isPutMutation = mutation instanceof Put;
 
         Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
@@ -2145,7 +2236,7 @@ public class HRegion implements HeapSize
         if (batchOp.retCodeDetails[i].getOperationStatusCode()
             != OperationStatusCode.NOT_RUN) continue;
 
-        Mutation mutation = batchOp.operations[i];
+        Mutation mutation = batchOp.getMutation(i);
         if (mutation instanceof Put) {
           updateKVTimestamps(familyMaps[i].values(), byteNow);
           noOfPuts++;
@@ -2167,7 +2258,7 @@ public class HRegion implements HeapSize
       // calling the pre CP hook for batch mutation
       if (!isInReplay && coprocessorHost != null) {
         MiniBatchOperationInProgress<Mutation> miniBatchOp =
-          new MiniBatchOperationInProgress<Mutation>(batchOp.operations,
+          new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
         if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
       }
@@ -2193,6 +2284,7 @@ public class HRegion implements HeapSize
       // ------------------------------------
       // STEP 4. Build WAL edit
       // ----------------------------------
+      boolean hasWalAppends = false;
       Durability durability = Durability.USE_DEFAULT;
       for (int i = firstIndex; i < lastIndexExclusive; i++) {
         // Skip puts that were determined to be invalid during preprocessing
@@ -2202,7 +2294,7 @@ public class HRegion implements HeapSize
         }
         batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
 
-        Mutation m = batchOp.operations[i];
+        Mutation m = batchOp.getMutation(i);
         Durability tmpDur = getEffectiveDurability(m.getDurability());
         if (tmpDur.ordinal() > durability.ordinal()) {
           durability = tmpDur;
@@ -2212,6 +2304,22 @@ public class HRegion implements HeapSize
           continue;
         }
 
+        long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
+        // In replay, the batch may contain multiple nonces. If so, write WALEdit for each.
+        // Given how nonces are originally written, these should be contiguous.
+        // txid should always increase, so having the last one is ok.
+        if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
+          if (walEdit.size() > 0) {
+            assert isInReplay;
+            txid = this.log.appendNoSync(this.getRegionInfo(), htableDescriptor.getTableName(),
+                  walEdit, m.getClusterIds(), now, htableDescriptor, this.sequenceId, true,
+                  currentNonceGroup, currentNonce);
+            hasWalAppends = true;
+          }
+          currentNonceGroup = nonceGroup;
+          currentNonce = nonce;
+        }
+
         // Add WAL edits by CP
         WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
         if (fromCP != null) {
@@ -2223,12 +2331,14 @@ public class HRegion implements HeapSize
       }
 
       // -------------------------
-      // STEP 5. Append the edit to WAL. Do not sync wal.
+      // STEP 5. Append the final edit to WAL. Do not sync wal.
       // -------------------------
-      Mutation mutation = batchOp.operations[firstIndex];
+      Mutation mutation = batchOp.getMutation(firstIndex);
       if (walEdit.size() > 0) {
         txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
-              walEdit, mutation.getClusterIds(), now, this.htableDescriptor, this.sequenceId);
+              walEdit, mutation.getClusterIds(), now, this.htableDescriptor, this.sequenceId,
+              true, currentNonceGroup, currentNonce);
+        hasWalAppends = true;
       }
 
       // -------------------------------
@@ -2243,14 +2353,14 @@ public class HRegion implements HeapSize
       // -------------------------
       // STEP 7. Sync wal.
       // -------------------------
-      if (walEdit.size() > 0) {
+      if (hasWalAppends) {
         syncOrDefer(txid, durability);
       }
       walSyncSuccessful = true;
       // calling the post CP hook for batch mutation
       if (!isInReplay && coprocessorHost != null) {
         MiniBatchOperationInProgress<Mutation> miniBatchOp =
-          new MiniBatchOperationInProgress<Mutation>(batchOp.operations,
+          new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
         coprocessorHost.postBatchMutate(miniBatchOp);
       }
@@ -2274,7 +2384,7 @@ public class HRegion implements HeapSize
               != OperationStatusCode.SUCCESS) {
             continue;
           }
-          Mutation m = batchOp.operations[i];
+          Mutation m = batchOp.getMutation(i);
           if (m instanceof Put) {
             coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
           } else {
@@ -2362,10 +2472,10 @@ public class HRegion implements HeapSize
     boolean isPut = w instanceof Put;
     if (!isPut && !(w instanceof Delete))
       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
-      		"be Put or Delete");
+          "be Put or Delete");
     if (!Bytes.equals(row, w.getRow())) {
       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
-      		"getRow must match the passed row");
+          "getRow must match the passed row");
     }
 
     startRegionOperation();
@@ -2435,9 +2545,10 @@ public class HRegion implements HeapSize
     }
   }
 
-  private void doBatchMutate(Mutation mutation) throws IOException,
-      org.apache.hadoop.hbase.DoNotRetryIOException {
-    OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation });
+  private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException {
+    // Currently this is only called for puts and deletes, so no nonces.
+    OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation },
+        HConstants.NO_NONCE, HConstants.NO_NONCE);
     if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
       throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
     } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
@@ -2628,7 +2739,7 @@ public class HRegion implements HeapSize
    * called when a Put/Delete has updated memstore but subequently fails to update
    * the wal. This method is then invoked to rollback the memstore.
    */
-  private void rollbackMemstore(BatchOperationInProgress<Mutation> batchOp,
+  private void rollbackMemstore(BatchOperationInProgress<?> batchOp,
                                 Map<byte[], List<Cell>>[] familyMaps,
                                 int start, int end) {
     int kvsRolledback = 0;
@@ -2901,6 +3012,7 @@ public class HRegion implements HeapSize
       HLog.Entry entry;
       Store store = null;
       boolean reported_once = false;
+      ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
 
       try {
         // How many edits seen before we check elapsed time
@@ -2916,6 +3028,10 @@ public class HRegion implements HeapSize
           HLogKey key = entry.getKey();
           WALEdit val = entry.getEdit();
 
+          if (ng != null) { // some test, or nonces disabled
+            ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
+          }
+
           if (reporter != null) {
             intervalEdits += val.size();
             if (intervalEdits >= interval) {
@@ -4395,35 +4511,47 @@ public class HRegion implements HeapSize
   }
 
   public void mutateRow(RowMutations rm) throws IOException {
+    // Don't need nonces here - RowMutations only supports puts and deletes
     mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
   }
 
   /**
+   * Perform atomic mutations within the region w/o nonces.
+   * See {@link #mutateRowsWithLocks(Collection, Collection, long, long)}
+   */
+  public void mutateRowsWithLocks(Collection<Mutation> mutations,
+      Collection<byte[]> rowsToLock) throws IOException {
+    mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
+  }
+
+  /**
    * Perform atomic mutations within the region.
    * @param mutations The list of mutations to perform.
    * <code>mutations</code> can contain operations for multiple rows.
    * Caller has to ensure that all rows are contained in this region.
    * @param rowsToLock Rows to lock
+   * @param nonceGroup Optional nonce group of the operation (client Id)
+   * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
    * If multiple rows are locked care should be taken that
    * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
    * @throws IOException
    */
   public void mutateRowsWithLocks(Collection<Mutation> mutations,
-      Collection<byte[]> rowsToLock) throws IOException {
-
-    MultiRowMutationProcessor proc =
-        new MultiRowMutationProcessor(mutations, rowsToLock);
-    processRowsWithLocks(proc, -1);
+      Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
+    MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
+    processRowsWithLocks(proc, -1, nonceGroup, nonce);
   }
 
   /**
    * Performs atomic multiple reads and writes on a given row.
    *
    * @param processor The object defines the reads and writes to a row.
+   * @param nonceGroup Optional nonce group of the operation (client Id)
+   * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
    */
-  public void processRowsWithLocks(RowProcessor<?,?> processor)
+  public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
       throws IOException {
-    processRowsWithLocks(processor, rowProcessorTimeout);
+    processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
   }
 
   /**
@@ -4432,9 +4560,11 @@ public class HRegion implements HeapSize
    * @param processor The object defines the reads and writes to a row.
    * @param timeout The timeout of the processor.process() execution
    *                Use a negative number to switch off the time bound
+   * @param nonceGroup Optional nonce group of the operation (client Id)
+   * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
    */
-  public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout)
-      throws IOException {
+  public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
+      long nonceGroup, long nonce) throws IOException {
 
     for (byte[] row : processor.getRowsToLock()) {
       checkRow(row, "processRowsWithLocks");
@@ -4506,7 +4636,7 @@ public class HRegion implements HeapSize
           if (!walEdit.isEmpty()) {
             txid = this.log.appendNoSync(this.getRegionInfo(),
               this.htableDescriptor.getTableName(), walEdit, processor.getClusterIds(), now,
-              this.htableDescriptor, this.sequenceId);
+              this.htableDescriptor, this.sequenceId, true, nonceGroup, nonce);
           }
           // 8. Release region lock
           if (locked) {
@@ -4609,6 +4739,10 @@ public class HRegion implements HeapSize
     }
   }
 
+  public Result append(Append append) throws IOException {
+    return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
+  }
+
   // TODO: There's a lot of boiler plate code identical
   // to increment... See how to better unify that.
   /**
@@ -4618,7 +4752,7 @@ public class HRegion implements HeapSize
    * @return new keyvalues after increment
    * @throws IOException
    */
-  public Result append(Append append)
+  public Result append(Append append, long nonceGroup, long nonce)
       throws IOException {
     byte[] row = append.getRow();
     checkRow(row, "append");
@@ -4749,7 +4883,8 @@ public class HRegion implements HeapSize
             // as a Put.
             txid = this.log.appendNoSync(this.getRegionInfo(),
               this.htableDescriptor.getTableName(), walEdits, new ArrayList<UUID>(),
-              EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId);
+              EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId,
+              true, nonceGroup, nonce);
           } else {
             recordMutationWithoutWal(append.getFamilyCellMap());
           }
@@ -4801,13 +4936,17 @@ public class HRegion implements HeapSize
     return append.isReturnResults() ? Result.create(allKVs) : null;
   }
 
+  public Result increment(Increment increment) throws IOException {
+    return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
+  }
+
   /**
    * Perform one or more increment operations on a row.
    * @param increment
    * @return new keyvalues after increment
    * @throws IOException
    */
-  public Result increment(Increment increment)
+  public Result increment(Increment increment, long nonceGroup, long nonce)
   throws IOException {
     byte [] row = increment.getRow();
     checkRow(row, "increment");
@@ -4923,7 +5062,8 @@ public class HRegion implements HeapSize
             // as a Put.
             txid = this.log.appendNoSync(this.getRegionInfo(),
               this.htableDescriptor.getTableName(), walEdits, new ArrayList<UUID>(),
-              EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId);
+              EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId,
+              true, nonceGroup, nonce);
           } else {
             recordMutationWithoutWal(increment.getFamilyCellMap());
           }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1542168&r1=1542167&r2=1542168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Nov 15 04:36:30 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
@@ -93,6 +94,7 @@ import org.apache.hadoop.hbase.client.Ro
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
+import org.apache.hadoop.hbase.exceptions.OperationConflictException;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
@@ -441,6 +443,9 @@ public class HRegionServer implements Cl
   /** The health check chore. */
   private HealthCheckChore healthCheckChore;
 
+  /** The nonce manager chore. */
+  private Chore nonceManagerChore;
+
   /**
    * The server name the Master sees us as.  Its made from the hostname the
    * master passes us, port, and server startcode. Gets set after registration
@@ -490,6 +495,26 @@ public class HRegionServer implements Cl
   // Table level lock manager for locking for region operations
   private TableLockManager tableLockManager;
 
+  /**
+   * Nonce manager. Nonces are used to make operations like increment and append idempotent
+   * in the case where client doesn't receive the response from a successful operation and
+   * retries. We track the successful ops for some time via a nonce sent by client and handle
+   * duplicate operations (currently, by failing them; in future we might use MVCC to return
+   * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
+   * HBASE-3787) are:
+   * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
+   *   of past records. If we don't read the records, we don't read and recover the nonces.
+   *   Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
+   * - There's no WAL recovery during normal region move, so nonces will not be transfered.
+   * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
+   * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
+   * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
+   * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
+   * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
+   * latest nonce in it expired. It can also be recovered during move.
+   */
+  private final ServerNonceManager nonceManager;
+
   private UserProvider userProvider;
 
   /**
@@ -529,6 +554,9 @@ public class HRegionServer implements Cl
 
     this.sleeper = new Sleeper(this.msgInterval, this);
 
+    boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
+    this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
+
     this.maxScannerResultSize = conf.getLong(
       HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
       HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
@@ -789,6 +817,11 @@ public class HRegionServer implements Cl
     // Create the thread to clean the moved regions list
     movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this);
 
+    if (this.nonceManager != null) {
+      // Create the chore that cleans up nonces.
+      nonceManagerChore = this.nonceManager.createCleanupChore(this);
+    }
+
     // Setup RPC client for master communication
     rpcClient = new RpcClient(conf, clusterId, new InetSocketAddress(
         this.isa.getAddress(), 0));
@@ -910,6 +943,9 @@ public class HRegionServer implements Cl
     if (this.healthCheckChore != null) {
       this.healthCheckChore.interrupt();
     }
+    if (this.nonceManagerChore != null) {
+      this.nonceManagerChore.interrupt();
+    }
 
     // Stop the snapshot handler, forcefully killing all running tasks
     try {
@@ -1556,8 +1592,11 @@ public class HRegionServer implements Cl
     Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), n +
         ".periodicFlusher", uncaughtExceptionHandler);
     if (this.healthCheckChore != null) {
-    Threads
-        .setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker",
+      Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker",
+            uncaughtExceptionHandler);
+    }
+    if (this.nonceManagerChore != null) {
+      Threads.setDaemonThreadRunning(this.nonceManagerChore.getThread(), n + ".nonceCleaner",
             uncaughtExceptionHandler);
     }
 
@@ -1811,6 +1850,9 @@ public class HRegionServer implements Cl
    * have already been called.
    */
   protected void join() {
+    if (this.nonceManagerChore != null) {
+      Threads.shutdown(this.nonceManagerChore.getThread());
+    }
     Threads.shutdown(this.compactionChecker.getThread());
     Threads.shutdown(this.periodicFlusher.getThread());
     this.cacheFlusher.join();
@@ -2826,15 +2868,19 @@ public class HRegionServer implements Cl
       if (!region.getRegionInfo().isMetaTable()) {
         cacheFlusher.reclaimMemStoreMemory();
       }
+      long nonceGroup = request.hasNonceGroup()
+          ? request.getNonceGroup() : HConstants.NO_NONCE;
       Result r = null;
       Boolean processed = null;
       MutationType type = mutation.getMutateType();
       switch (type) {
       case APPEND:
-        r = append(region, mutation, cellScanner);
+        // TODO: this doesn't actually check anything.
+        r = append(region, mutation, cellScanner, nonceGroup);
         break;
       case INCREMENT:
-        r = increment(region, mutation, cellScanner);
+        // TODO: this doesn't actually check anything.
+        r = increment(region, mutation, cellScanner, nonceGroup);
         break;
       case PUT:
         Put put = ProtobufUtil.toPut(mutation, cellScanner);
@@ -3251,6 +3297,8 @@ public class HRegionServer implements Cl
     CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
     if (controller != null) controller.setCellScanner(null);
 
+    long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
+
     // this will contain all the cells that we need to return. It's created later, if needed.
     List<CellScannable> cellsToReturn = null;
     MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
@@ -3279,7 +3327,7 @@ public class HRegionServer implements Cl
       } else {
         // doNonAtomicRegionMutation manages the exception internally
         cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner,
-            regionActionResultBuilder, cellsToReturn);
+            regionActionResultBuilder, cellsToReturn, nonceGroup);
       }
       responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
     }
@@ -3303,7 +3351,7 @@ public class HRegionServer implements Cl
    */
   private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
       final RegionAction actions, final CellScanner cellScanner,
-      final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn) {
+      final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup) {
     // Gather up CONTIGUOUS Puts and Deletes in this mutations List.  Idea is that rather than do
     // one at a time, we instead pass them in batch.  Be aware that the corresponding
     // ResultOrException instance that matches each Put or Delete is then added down in the
@@ -3326,10 +3374,10 @@ public class HRegionServer implements Cl
           }
           switch (type) {
           case APPEND:
-            r = append(region, action.getMutation(), cellScanner);
+            r = append(region, action.getMutation(), cellScanner, nonceGroup);
             break;
           case INCREMENT:
-            r = increment(region, action.getMutation(), cellScanner);
+            r = increment(region, action.getMutation(), cellScanner,  nonceGroup);
             break;
           case PUT:
           case DELETE:
@@ -3862,12 +3910,18 @@ public class HRegionServer implements Cl
         entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
       RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
       List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>();
-      List<Pair<MutationType, Mutation>> mutations = new ArrayList<Pair<MutationType, Mutation>>();
+      List<HLogSplitter.MutationReplay> mutations = new ArrayList<HLogSplitter.MutationReplay>();
       for (WALEntry entry : entries) {
+        if (nonceManager != null) {
+          long nonceGroup = entry.getKey().hasNonceGroup()
+              ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
+          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
+          nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime());
+        }
         Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
           new Pair<HLogKey, WALEdit>();
-        List<Pair<MutationType, Mutation>> edits = HLogSplitter.getMutationsFromWALEntry(entry,
-          cells, walEntry);
+        List<HLogSplitter.MutationReplay> edits =
+            HLogSplitter.getMutationsFromWALEntry(entry, cells, walEntry);
         if (coprocessorHost != null) {
           // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
           // KeyValue.
@@ -3882,7 +3936,7 @@ public class HRegionServer implements Cl
       }
 
       if (!mutations.isEmpty()) {
-        OperationStatus[] result = doBatchOp(region, mutations, true);
+        OperationStatus[] result = doReplayBatchOp(region, mutations);
         // check if it's a partial success
         for (int i = 0; result != null && i < result.length; i++) {
           if (result[i] != OperationStatus.SUCCESS) {
@@ -3987,7 +4041,7 @@ public class HRegionServer implements Cl
    * @throws IOException
    */
   protected Result append(final HRegion region,
-      final MutationProto m, final CellScanner cellScanner) throws IOException {
+      final MutationProto m, final CellScanner cellScanner, long nonceGroup) throws IOException {
     long before = EnvironmentEdgeManager.currentTimeMillis();
     Append append = ProtobufUtil.toAppend(m, cellScanner);
     Result r = null;
@@ -3995,7 +4049,14 @@ public class HRegionServer implements Cl
       r = region.getCoprocessorHost().preAppend(append);
     }
     if (r == null) {
-      r = region.append(append);
+      long nonce = startNonceOperation(m, nonceGroup);
+      boolean success = false;
+      try {
+        r = region.append(append, nonceGroup, nonce);
+        success = true;
+      } finally {
+        endNonceOperation(m, nonceGroup, success);
+      }
       if (region.getCoprocessorHost() != null) {
         region.getCoprocessorHost().postAppend(append, r);
       }
@@ -4013,8 +4074,7 @@ public class HRegionServer implements Cl
    * @throws IOException
    */
   protected Result increment(final HRegion region, final MutationProto mutation,
-      final CellScanner cells)
-  throws IOException {
+      final CellScanner cells, long nonceGroup) throws IOException {
     long before = EnvironmentEdgeManager.currentTimeMillis();
     Increment increment = ProtobufUtil.toIncrement(mutation, cells);
     Result r = null;
@@ -4022,7 +4082,14 @@ public class HRegionServer implements Cl
       r = region.getCoprocessorHost().preIncrement(increment);
     }
     if (r == null) {
-      r = region.increment(increment);
+      long nonce = startNonceOperation(mutation, nonceGroup);
+      boolean success = false;
+      try {
+        r = region.increment(increment, nonceGroup, nonce);
+        success = true;
+      } finally {
+        endNonceOperation(mutation, nonceGroup, success);
+      }
       if (region.getCoprocessorHost() != null) {
         r = region.getCoprocessorHost().postIncrement(increment, r);
       }
@@ -4032,6 +4099,49 @@ public class HRegionServer implements Cl
   }
 
   /**
+   * Starts the nonce operation for a mutation, if needed.
+   * @param mutation Mutation.
+   * @param nonceGroup Nonce group from the request.
+   * @returns Nonce used (can be NO_NONCE).
+   */
+  private long startNonceOperation(final MutationProto mutation, long nonceGroup)
+      throws IOException, OperationConflictException {
+    if (nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE;
+    boolean canProceed = false;
+    try {
+      canProceed = nonceManager.startOperation(nonceGroup, mutation.getNonce(), this);
+    } catch (InterruptedException ex) {
+      // Probably should not happen.
+      throw new InterruptedIOException("Nonce start operation interrupted");
+    }
+    if (!canProceed) {
+      // TODO: instead, we could convert append/increment to get w/mvcc
+      String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce()
+          + "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
+          + "] may have already completed";
+      throw new OperationConflictException(message);
+    }
+    return mutation.getNonce();
+  }
+
+  /**
+   * Ends nonce operation for a mutation, if needed.
+   * @param mutation Mutation.
+   * @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
+   * @param success Whether the operation for this nonce has succeeded.
+   */
+  private void endNonceOperation(final MutationProto mutation, long nonceGroup,
+      boolean success) {
+    if (nonceManager == null || !mutation.hasNonce()) return;
+    nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
+  }
+
+  @Override
+  public ServerNonceManager getNonceManager() {
+    return this.nonceManager;
+  }
+
+  /**
    * Execute a list of Put/Delete mutations.
    *
    * @param builder
@@ -4063,7 +4173,7 @@ public class HRegionServer implements Cl
         cacheFlusher.reclaimMemStoreMemory();
       }
 
-      OperationStatus codes[] = region.batchMutate(mArray, false);
+      OperationStatus codes[] = region.batchMutate(mArray);
       for (i = 0; i < codes.length; i++) {
         int index = mutations.get(i).getIndex();
         Exception e = null;
@@ -4119,32 +4229,31 @@ public class HRegionServer implements Cl
    * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
    * @param region
    * @param mutations
-   * @param isReplay
    * @return an array of OperationStatus which internally contains the OperationStatusCode and the
    *         exceptionMessage if any
    * @throws IOException
    */
-  protected OperationStatus [] doBatchOp(final HRegion region,
-      final List<Pair<MutationType, Mutation>> mutations, boolean isReplay)
-  throws IOException {
-    Mutation[] mArray = new Mutation[mutations.size()];
+  protected OperationStatus [] doReplayBatchOp(final HRegion region,
+      final List<HLogSplitter.MutationReplay> mutations) throws IOException {
+    HLogSplitter.MutationReplay[] mArray = new HLogSplitter.MutationReplay[mutations.size()];
+
     long before = EnvironmentEdgeManager.currentTimeMillis();
     boolean batchContainsPuts = false, batchContainsDelete = false;
     try {
       int i = 0;
-      for (Pair<MutationType, Mutation> m : mutations) {
-        if (m.getFirst() == MutationType.PUT) {
+      for (HLogSplitter.MutationReplay m : mutations) {
+        if (m.type == MutationType.PUT) {
           batchContainsPuts = true;
         } else {
           batchContainsDelete = true;
         }
-        mArray[i++] = m.getSecond();
+        mArray[i++] = m;
       }
       requestCount.add(mutations.size());
       if (!region.getRegionInfo().isMetaTable()) {
         cacheFlusher.reclaimMemStoreMemory();
       }
-      return region.batchMutate(mArray, isReplay);
+      return region.batchReplay(mArray);
     } finally {
       long after = EnvironmentEdgeManager.currentTimeMillis();
       if (batchContainsPuts) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java?rev=1542168&r1=1542167&r2=1542168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java Fri Nov 15 04:36:30 2013
@@ -115,4 +115,10 @@ public interface RegionServerServices
    * @return set of recovering regions on the hosting region server
    */
   Map<String, HRegion> getRecoveringRegions();
+
+  /**
+   * Only required for "old" log replay; if it's removed, remove this.
+   * @return The RegionServer's NonceManager
+   */
+  public ServerNonceManager getNonceManager();
 }