You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2015/07/13 18:14:50 UTC

[8/8] hbase git commit: HBASE-13415 Procedure v2 - Use nonces for double submits from client (Stephen Yuan Jiang)

HBASE-13415 Procedure v2 - Use nonces for double submits from client (Stephen Yuan Jiang)

Signed-off-by: Sean Busbey <bu...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/951ec7a0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/951ec7a0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/951ec7a0

Branch: refs/heads/master
Commit: 951ec7a0b7ade9ea8aa0b112537a267b5ce693c3
Parents: 60d6435
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Thu Jul 9 07:44:36 2015 -0700
Committer: Sean Busbey <bu...@apache.org>
Committed: Mon Jul 13 11:02:30 2015 -0500

----------------------------------------------------------------------
 .../apache/hadoop/hbase/client/HBaseAdmin.java  |   36 +-
 .../hadoop/hbase/protobuf/RequestConverter.java |   69 +-
 .../org/apache/hadoop/hbase/util/NonceKey.java  |   65 +
 .../hadoop/hbase/procedure2/Procedure.java      |   27 +
 .../hbase/procedure2/ProcedureExecutor.java     |   89 +-
 .../hbase/procedure2/ProcedureResult.java       |   34 +-
 .../procedure2/ProcedureTestingUtility.java     |   11 +-
 .../hbase/procedure2/TestProcedureRecovery.java |   39 +-
 .../hbase/protobuf/generated/MasterProtos.java  | 2102 ++++++++++++++++--
 .../protobuf/generated/ProcedureProtos.java     |  257 ++-
 hbase-protocol/src/main/protobuf/Master.proto   |   18 +
 .../src/main/protobuf/Procedure.proto           |    4 +
 .../org/apache/hadoop/hbase/master/HMaster.java |  107 +-
 .../hadoop/hbase/master/MasterRpcServices.java  |   52 +-
 .../hadoop/hbase/master/MasterServices.java     |   70 +-
 .../hbase/regionserver/ServerNonceManager.java  |   33 +-
 .../security/access/AccessControlLists.java     |    4 +-
 .../visibility/VisibilityController.java        |    2 +-
 .../hbase/client/TestHBaseAdminNoCluster.java   |    5 +
 .../master/TestAssignmentManagerOnCluster.java  |    2 +-
 .../hadoop/hbase/master/TestCatalogJanitor.java |   56 +-
 .../MasterProcedureTestingUtility.java          |    9 +
 .../procedure/TestAddColumnFamilyProcedure.java |   94 +-
 .../procedure/TestCreateTableProcedure.java     |   46 +-
 .../TestDeleteColumnFamilyProcedure.java        |  102 +-
 .../procedure/TestDeleteTableProcedure.java     |   48 +-
 .../procedure/TestDisableTableProcedure.java    |   44 +-
 .../procedure/TestEnableTableProcedure.java     |   47 +-
 .../TestModifyColumnFamilyProcedure.java        |   45 +-
 .../procedure/TestModifyTableProcedure.java     |   27 +-
 .../procedure/TestTruncateTableProcedure.java   |   15 +-
 31 files changed, 3042 insertions(+), 517 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/951ec7a0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index a06fb2c..87b8278 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -205,6 +205,8 @@ public class HBaseAdmin implements Admin {
 
   private RpcRetryingCallerFactory rpcCallerFactory;
 
+  private NonceGenerator ng;
+
   /**
    * Constructor.
    * See {@link #HBaseAdmin(Connection connection)}
@@ -259,6 +261,8 @@ public class HBaseAdmin implements Admin {
       "hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min
 
     this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
+
+    this.ng = this.connection.getNonceGenerator();
   }
 
   @Override
@@ -618,7 +622,8 @@ public class HBaseAdmin implements Admin {
         new MasterCallable<CreateTableResponse>(getConnection()) {
       @Override
       public CreateTableResponse call(int callTimeout) throws ServiceException {
-        CreateTableRequest request = RequestConverter.buildCreateTableRequest(desc, splitKeys);
+        CreateTableRequest request = RequestConverter.buildCreateTableRequest(
+          desc, splitKeys, ng.getNonceGroup(), ng.newNonce());
         return master.createTable(null, request);
       }
     });
@@ -706,7 +711,8 @@ public class HBaseAdmin implements Admin {
         new MasterCallable<DeleteTableResponse>(getConnection()) {
       @Override
       public DeleteTableResponse call(int callTimeout) throws ServiceException {
-        DeleteTableRequest req = RequestConverter.buildDeleteTableRequest(tableName);
+        DeleteTableRequest req =
+            RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce());
         return master.deleteTable(null,req);
       }
     });
@@ -829,9 +835,9 @@ public class HBaseAdmin implements Admin {
         executeCallable(new MasterCallable<TruncateTableResponse>(getConnection()) {
           @Override
           public TruncateTableResponse call(int callTimeout) throws ServiceException {
-            LOG.info("Started enable of " + tableName);
-            TruncateTableRequest req =
-                RequestConverter.buildTruncateTableRequest(tableName, preserveSplits);
+            LOG.info("Started truncating " + tableName);
+            TruncateTableRequest req = RequestConverter.buildTruncateTableRequest(
+              tableName, preserveSplits, ng.getNonceGroup(), ng.newNonce());
             return master.truncateTable(null, req);
           }
         });
@@ -992,7 +998,8 @@ public class HBaseAdmin implements Admin {
       @Override
       public EnableTableResponse call(int callTimeout) throws ServiceException {
         LOG.info("Started enable of " + tableName);
-        EnableTableRequest req = RequestConverter.buildEnableTableRequest(tableName);
+        EnableTableRequest req =
+            RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce());
         return master.enableTable(null,req);
       }
     });
@@ -1129,7 +1136,8 @@ public class HBaseAdmin implements Admin {
       @Override
       public DisableTableResponse call(int callTimeout) throws ServiceException {
         LOG.info("Started disable of " + tableName);
-        DisableTableRequest req = RequestConverter.buildDisableTableRequest(tableName);
+        DisableTableRequest req =
+            RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce());
         return master.disableTable(null, req);
       }
     });
@@ -1411,7 +1419,8 @@ public class HBaseAdmin implements Admin {
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
       public Void call(int callTimeout) throws ServiceException {
-        AddColumnRequest req = RequestConverter.buildAddColumnRequest(tableName, columnFamily);
+        AddColumnRequest req = RequestConverter.buildAddColumnRequest(
+          tableName, columnFamily, ng.getNonceGroup(), ng.newNonce());
         master.addColumn(null, req);
         return null;
       }
@@ -1479,8 +1488,8 @@ public class HBaseAdmin implements Admin {
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
       public Void call(int callTimeout) throws ServiceException {
-        DeleteColumnRequest req =
-          RequestConverter.buildDeleteColumnRequest(tableName, columnFamily);
+        DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest(
+          tableName, columnFamily, ng.getNonceGroup(), ng.newNonce());
         master.deleteColumn(null, req);
         return null;
       }
@@ -1548,8 +1557,8 @@ public class HBaseAdmin implements Admin {
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
       public Void call(int callTimeout) throws ServiceException {
-        ModifyColumnRequest req =
-          RequestConverter.buildModifyColumnRequest(tableName, columnFamily);
+        ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest(
+          tableName, columnFamily, ng.getNonceGroup(), ng.newNonce());
         master.modifyColumn(null,req);
         return null;
       }
@@ -2398,7 +2407,8 @@ public class HBaseAdmin implements Admin {
         new MasterCallable<ModifyTableResponse>(getConnection()) {
       @Override
       public ModifyTableResponse call(int callTimeout) throws ServiceException {
-        ModifyTableRequest request = RequestConverter.buildModifyTableRequest(tableName, htd);
+        ModifyTableRequest request = RequestConverter.buildModifyTableRequest(
+          tableName, htd, ng.getNonceGroup(), ng.newNonce());
         return master.modifyTable(null, request);
       }
     });

http://git-wip-us.apache.org/repos/asf/hbase/blob/951ec7a0/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
index 5a63b23..96260fd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
@@ -1041,10 +1041,15 @@ public final class RequestConverter {
    * @return an AddColumnRequest
    */
   public static AddColumnRequest buildAddColumnRequest(
-      final TableName tableName, final HColumnDescriptor column) {
+      final TableName tableName,
+      final HColumnDescriptor column,
+      final long nonceGroup,
+      final long nonce) {
     AddColumnRequest.Builder builder = AddColumnRequest.newBuilder();
     builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
     builder.setColumnFamilies(column.convert());
+    builder.setNonceGroup(nonceGroup);
+    builder.setNonce(nonce);
     return builder.build();
   }
 
@@ -1056,10 +1061,15 @@ public final class RequestConverter {
    * @return a DeleteColumnRequest
    */
   public static DeleteColumnRequest buildDeleteColumnRequest(
-      final TableName tableName, final byte [] columnName) {
+      final TableName tableName,
+      final byte [] columnName,
+      final long nonceGroup,
+      final long nonce) {
     DeleteColumnRequest.Builder builder = DeleteColumnRequest.newBuilder();
     builder.setTableName(ProtobufUtil.toProtoTableName((tableName)));
     builder.setColumnName(ByteStringer.wrap(columnName));
+    builder.setNonceGroup(nonceGroup);
+    builder.setNonce(nonce);
     return builder.build();
   }
 
@@ -1071,10 +1081,15 @@ public final class RequestConverter {
    * @return an ModifyColumnRequest
    */
   public static ModifyColumnRequest buildModifyColumnRequest(
-      final TableName tableName, final HColumnDescriptor column) {
+      final TableName tableName,
+      final HColumnDescriptor column,
+      final long nonceGroup,
+      final long nonce) {
     ModifyColumnRequest.Builder builder = ModifyColumnRequest.newBuilder();
     builder.setTableName(ProtobufUtil.toProtoTableName((tableName)));
     builder.setColumnFamilies(column.convert());
+    builder.setNonceGroup(nonceGroup);
+    builder.setNonce(nonce);
     return builder.build();
   }
 
@@ -1156,9 +1171,14 @@ public final class RequestConverter {
    * @param tableName
    * @return a DeleteTableRequest
    */
-  public static DeleteTableRequest buildDeleteTableRequest(final TableName tableName) {
+  public static DeleteTableRequest buildDeleteTableRequest(
+      final TableName tableName,
+      final long nonceGroup,
+      final long nonce) {
     DeleteTableRequest.Builder builder = DeleteTableRequest.newBuilder();
     builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
+    builder.setNonceGroup(nonceGroup);
+    builder.setNonce(nonce);
     return builder.build();
   }
 
@@ -1169,11 +1189,16 @@ public final class RequestConverter {
    * @param preserveSplits True if the splits should be preserved
    * @return a TruncateTableRequest
    */
-  public static TruncateTableRequest buildTruncateTableRequest(final TableName tableName,
-        boolean preserveSplits) {
+  public static TruncateTableRequest buildTruncateTableRequest(
+      final TableName tableName,
+      final boolean preserveSplits,
+      final long nonceGroup,
+      final long nonce) {
     TruncateTableRequest.Builder builder = TruncateTableRequest.newBuilder();
     builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
     builder.setPreserveSplits(preserveSplits);
+    builder.setNonceGroup(nonceGroup);
+    builder.setNonce(nonce);
     return builder.build();
   }
 
@@ -1183,9 +1208,14 @@ public final class RequestConverter {
    * @param tableName
    * @return an EnableTableRequest
    */
-  public static EnableTableRequest buildEnableTableRequest(final TableName tableName) {
+  public static EnableTableRequest buildEnableTableRequest(
+      final TableName tableName,
+      final long nonceGroup,
+      final long nonce) {
     EnableTableRequest.Builder builder = EnableTableRequest.newBuilder();
     builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
+    builder.setNonceGroup(nonceGroup);
+    builder.setNonce(nonce);
     return builder.build();
   }
 
@@ -1195,9 +1225,14 @@ public final class RequestConverter {
    * @param tableName
    * @return a DisableTableRequest
    */
-  public static DisableTableRequest buildDisableTableRequest(final TableName tableName) {
+  public static DisableTableRequest buildDisableTableRequest(
+      final TableName tableName,
+      final long nonceGroup,
+      final long nonce) {
     DisableTableRequest.Builder builder = DisableTableRequest.newBuilder();
     builder.setTableName(ProtobufUtil.toProtoTableName((tableName)));
+    builder.setNonceGroup(nonceGroup);
+    builder.setNonce(nonce);
     return builder.build();
   }
 
@@ -1209,7 +1244,10 @@ public final class RequestConverter {
    * @return a CreateTableRequest
    */
   public static CreateTableRequest buildCreateTableRequest(
-      final HTableDescriptor hTableDesc, final byte [][] splitKeys) {
+      final HTableDescriptor hTableDesc,
+      final byte [][] splitKeys,
+      final long nonceGroup,
+      final long nonce) {
     CreateTableRequest.Builder builder = CreateTableRequest.newBuilder();
     builder.setTableSchema(hTableDesc.convert());
     if (splitKeys != null) {
@@ -1217,6 +1255,8 @@ public final class RequestConverter {
         builder.addSplitKeys(ByteStringer.wrap(splitKey));
       }
     }
+    builder.setNonceGroup(nonceGroup);
+    builder.setNonce(nonce);
     return builder.build();
   }
 
@@ -1229,10 +1269,15 @@ public final class RequestConverter {
    * @return a ModifyTableRequest
    */
   public static ModifyTableRequest buildModifyTableRequest(
-      final TableName tableName, final HTableDescriptor hTableDesc) {
+      final TableName tableName,
+      final HTableDescriptor hTableDesc,
+      final long nonceGroup,
+      final long nonce) {
     ModifyTableRequest.Builder builder = ModifyTableRequest.newBuilder();
     builder.setTableName(ProtobufUtil.toProtoTableName((tableName)));
     builder.setTableSchema(hTableDesc.convert());
+    builder.setNonceGroup(nonceGroup);
+    builder.setNonce(nonce);
     return builder.build();
   }
 
@@ -1347,7 +1392,9 @@ public final class RequestConverter {
    * @param synchronous
    * @return a SetBalancerRunningRequest
    */
-  public static SetBalancerRunningRequest buildSetBalancerRunningRequest(boolean on, boolean synchronous) {
+  public static SetBalancerRunningRequest buildSetBalancerRunningRequest(
+      boolean on,
+      boolean synchronous) {
     return SetBalancerRunningRequest.newBuilder().setOn(on).setSynchronous(synchronous).build();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/951ec7a0/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java
new file mode 100644
index 0000000..9c7c72a
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java
@@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+ /**
+   * This implementation is not smart and just treats nonce group and nonce as random bits.
+   */
+  // TODO: we could use pure byte arrays, but then we wouldn't be able to use hash map.
+@InterfaceAudience.Private
+public class NonceKey {
+  private long group;
+  private long nonce;
+
+  public NonceKey(long group, long nonce) {
+    assert nonce != HConstants.NO_NONCE;
+    this.group = group;
+    this.nonce = nonce;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || !(obj instanceof NonceKey)) {
+      return false;
+    }
+    NonceKey nk = ((NonceKey)obj);
+    return this.nonce == nk.nonce && this.group == nk.group;
+  }
+
+  @Override
+  public int hashCode() {
+    return (int)((group >> 32) ^ group ^ (nonce >> 32) ^ nonce);
+  }
+
+  @Override
+  public String toString() {
+    return "[" + group + ":" + nonce + "]";
+  }
+
+  public long getNonceGroup() {
+    return group;
+  }
+
+  public long getNonce() {
+    return nonce;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/951ec7a0/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index 601ebcd..a343c89 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.NonceKey;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -78,6 +80,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
   private RemoteProcedureException exception = null;
   private byte[] result = null;
 
+  private NonceKey nonceKey = null;
+
   /**
    * The main code of the procedure. It must be idempotent since execute()
    * may be called multiple time in case of machine failure in the middle
@@ -262,6 +266,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
     return parentProcId;
   }
 
+  public NonceKey getNonceKey() {
+    return nonceKey;
+  }
+
   /**
    * @return true if the procedure has failed.
    *         true may mean failed but not yet rolledback or failed and rolledback.
@@ -414,6 +422,15 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
   }
 
   /**
+   * Called by the ProcedureExecutor to set the value to the newly created procedure.
+   */
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  protected void setNonceKey(final NonceKey nonceKey) {
+    this.nonceKey = nonceKey;
+  }
+
+  /**
    * Internal method called by the ProcedureExecutor that starts the
    * user-level code execute().
    */
@@ -661,6 +678,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
       builder.setStateData(stateStream.toByteString());
     }
 
+    if (proc.getNonceKey() != null) {
+      builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
+      builder.setNonce(proc.getNonceKey().getNonce());
+    }
+
     return builder.build();
   }
 
@@ -712,6 +734,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
       proc.setResult(proto.getResult().toByteArray());
     }
 
+    if (proto.getNonce() != HConstants.NO_NONCE) {
+      NonceKey nonceKey = new NonceKey(proto.getNonceGroup(), proto.getNonce());
+      proc.setNonceKey(nonceKey);
+    }
+
     // we want to call deserialize even when the stream is empty, mainly for testing.
     proc.deserializeStateData(proto.getStateData().newInput());
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/951ec7a0/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 49b658b..01e9a37 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
@@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
 import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.NonceKey;
 import org.apache.hadoop.hbase.util.Pair;
 
 import com.google.common.base.Preconditions;
@@ -134,14 +136,17 @@ public class ProcedureExecutor<TEnvironment> {
     private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
 
     private final Map<Long, ProcedureResult> completed;
+    private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
     private final ProcedureStore store;
     private final Configuration conf;
 
     public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store,
-        final Map<Long, ProcedureResult> completedMap) {
+        final Map<Long, ProcedureResult> completedMap,
+        final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
       // set the timeout interval that triggers the periodic-procedure
       setTimeout(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
       this.completed = completedMap;
+      this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
       this.store = store;
       this.conf = conf;
     }
@@ -171,6 +176,11 @@ public class ProcedureExecutor<TEnvironment> {
           }
           store.delete(entry.getKey());
           it.remove();
+
+          NonceKey nonceKey = result.getNonceKey();
+          if (nonceKey != null) {
+            nonceKeysToProcIdsMap.remove(nonceKey);
+          }
         }
       }
     }
@@ -225,6 +235,13 @@ public class ProcedureExecutor<TEnvironment> {
     new ConcurrentHashMap<Long, Procedure>();
 
   /**
+   * Helper map to lookup whether the procedure already issued from the same client.
+   * This map contains every root procedure.
+   */
+  private ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap =
+      new ConcurrentHashMap<NonceKey, Long>();
+
+  /**
    * Timeout Queue that contains Procedures in a WAITING_TIMEOUT state
    * or periodic procedures.
    */
@@ -312,6 +329,11 @@ public class ProcedureExecutor<TEnvironment> {
       proc.beforeReplay(getEnvironment());
       procedures.put(proc.getProcId(), proc);
 
+      // add the nonce to the map
+      if (proc.getNonceKey() != null) {
+        nonceKeysToProcIdsMap.put(proc.getNonceKey(), proc.getProcId());
+      }
+
       if (proc.getState() == ProcedureState.RUNNABLE) {
         runnablesCount++;
       }
@@ -343,6 +365,7 @@ public class ProcedureExecutor<TEnvironment> {
         assert !rollbackStack.containsKey(proc.getProcId());
         procedures.remove(proc.getProcId());
         completed.put(proc.getProcId(), newResultFromProcedure(proc));
+
         continue;
       }
 
@@ -479,7 +502,8 @@ public class ProcedureExecutor<TEnvironment> {
     }
 
     // Add completed cleaner
-    waitingTimeout.add(new CompletedProcedureCleaner(conf, store, completed));
+    waitingTimeout.add(
+      new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap));
   }
 
   public void stop() {
@@ -510,6 +534,7 @@ public class ProcedureExecutor<TEnvironment> {
     completed.clear();
     rollbackStack.clear();
     procedures.clear();
+    nonceKeysToProcIdsMap.clear();
     waitingTimeout.clear();
     runnables.clear();
     lastProcId.set(-1);
@@ -552,13 +577,53 @@ public class ProcedureExecutor<TEnvironment> {
    * @return the procedure id, that can be used to monitor the operation
    */
   public long submitProcedure(final Procedure proc) {
+    return submitProcedure(proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
+  }
+
+  /**
+   * Add a new root-procedure to the executor.
+   * @param proc the new procedure to execute.
+   * @param nonceGroup
+   * @param nonce
+   * @return the procedure id, that can be used to monitor the operation
+   */
+  public long submitProcedure(
+      final Procedure proc,
+      final long nonceGroup,
+      final long nonce) {
     Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
     Preconditions.checkArgument(isRunning());
     Preconditions.checkArgument(lastProcId.get() >= 0);
     Preconditions.checkArgument(!proc.hasParent());
 
-    // Initialize the Procedure ID
-    proc.setProcId(nextProcId());
+    Long currentProcId;
+
+    // The following part of the code has to be synchronized to prevent multiple request
+    // with the same nonce to execute at the same time.
+    synchronized (this) {
+      // Check whether the proc exists.  If exist, just return the proc id.
+      // This is to prevent the same proc to submit multiple times (it could happen
+      // when client could not talk to server and resubmit the same request).
+      NonceKey noncekey = null;
+      if (nonce != HConstants.NO_NONCE) {
+        noncekey = new NonceKey(nonceGroup, nonce);
+        currentProcId = nonceKeysToProcIdsMap.get(noncekey);
+        if (currentProcId != null) {
+          // Found the proc
+          return currentProcId;
+        }
+      }
+
+      // Initialize the Procedure ID
+      currentProcId = nextProcId();
+      proc.setProcId(currentProcId);
+
+      // This is new procedure. Set the noncekey and insert into the map.
+      if (noncekey != null) {
+        proc.setNonceKey(noncekey);
+        nonceKeysToProcIdsMap.put(noncekey, currentProcId);
+      }
+    } // end of synchronized (this)
 
     // Commit the transaction
     store.insert(proc, null);
@@ -568,14 +633,14 @@ public class ProcedureExecutor<TEnvironment> {
 
     // Create the rollback stack for the procedure
     RootProcedureState stack = new RootProcedureState();
-    rollbackStack.put(proc.getProcId(), stack);
+    rollbackStack.put(currentProcId, stack);
 
     // Submit the new subprocedures
-    assert !procedures.containsKey(proc.getProcId());
-    procedures.put(proc.getProcId(), proc);
-    sendProcedureAddedNotification(proc.getProcId());
+    assert !procedures.containsKey(currentProcId);
+    procedures.put(currentProcId, proc);
+    sendProcedureAddedNotification(currentProcId);
     runnables.addBack(proc);
-    return proc.getProcId();
+    return currentProcId;
   }
 
   public ProcedureResult getResult(final long procId) {
@@ -1162,8 +1227,10 @@ public class ProcedureExecutor<TEnvironment> {
 
   private static ProcedureResult newResultFromProcedure(final Procedure proc) {
     if (proc.isFailed()) {
-      return new ProcedureResult(proc.getStartTime(), proc.getLastUpdate(), proc.getException());
+      return new ProcedureResult(
+        proc.getNonceKey(), proc.getStartTime(), proc.getLastUpdate(), proc.getException());
     }
-    return new ProcedureResult(proc.getStartTime(), proc.getLastUpdate(), proc.getResult());
+    return new ProcedureResult(
+      proc.getNonceKey(), proc.getStartTime(), proc.getLastUpdate(), proc.getResult());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/951ec7a0/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java
index 98c293b..ff5407f 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.procedure2;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.NonceKey;
 
 /**
  * Once a Procedure completes the ProcedureExecutor takes all the useful
@@ -30,6 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class ProcedureResult {
+  private final NonceKey nonceKey;
   private final RemoteProcedureException exception;
   private final long lastUpdate;
   private final long startTime;
@@ -37,21 +39,39 @@ public class ProcedureResult {
 
   private long clientAckTime = -1;
 
-  public ProcedureResult(final long startTime, final long lastUpdate,
+  public ProcedureResult(
+      final NonceKey nonceKey,
+      final long startTime,
+      final long lastUpdate,
       final RemoteProcedureException exception) {
-    this.lastUpdate = lastUpdate;
-    this.startTime = startTime;
-    this.exception = exception;
-    this.result = null;
+    this(nonceKey, exception, lastUpdate, startTime, null);
+  }
+
+  public ProcedureResult(
+      final NonceKey nonceKey,
+      final long startTime,
+      final long lastUpdate,
+      final byte[] result) {
+    this(nonceKey, null, lastUpdate, startTime, result);
   }
 
-  public ProcedureResult(final long startTime, final long lastUpdate, final byte[] result) {
+  public ProcedureResult(
+      final NonceKey nonceKey,
+      final RemoteProcedureException exception,
+      final long lastUpdate,
+      final long startTime,
+      final byte[] result) {
+    this.nonceKey = nonceKey;
+    this.exception = exception;
     this.lastUpdate = lastUpdate;
     this.startTime = startTime;
-    this.exception = null;
     this.result = result;
   }
 
+  public NonceKey getNonceKey() {
+    return nonceKey;
+  }
+
   public boolean isFailed() {
     return exception != null;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/951ec7a0/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index 66eb3ea..34774ed 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
@@ -119,7 +120,7 @@ public class ProcedureTestingUtility {
     procStore.start(1);
     procExecutor.start(1, false);
     try {
-      return submitAndWait(procExecutor, proc);
+      return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
     } finally {
       procStore.stop(false);
       procExecutor.stop();
@@ -127,7 +128,13 @@ public class ProcedureTestingUtility {
   }
 
   public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
-    long procId = procExecutor.submitProcedure(proc);
+    return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
+  }
+
+  public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
+      final long nonceGroup,
+      final long nonce) {
+    long procId = procExecutor.submitProcedure(proc, nonceGroup, nonce);
     waitProcedure(procExecutor, procId);
     return procId;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/951ec7a0/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
index 49bea5b..24a448e 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Assert;
@@ -77,6 +76,9 @@ public class TestProcedureRecovery {
     procStore.start(PROCEDURE_EXECUTOR_SLOTS);
     procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
     procSleepInterval = 0;
+
+    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, false);
+    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, false);
   }
 
   @After
@@ -286,6 +288,41 @@ public class TestProcedureRecovery {
     ProcedureTestingUtility.assertIsAbortException(result);
   }
 
+  @Test(timeout=30000)
+  public void testCompletedProcWithSameNonce() throws Exception {
+    final long nonceGroup = 123;
+    final long nonce = 2222;
+    Procedure proc = new TestSingleStepProcedure();
+    // Submit a proc and wait for its completion
+    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce);
+
+    // Restart
+    restart();
+    Procedure proc2 = new TestSingleStepProcedure();
+    // Submit a procedure with the same nonce and expect the same procedure would return.
+    long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce);
+    assertTrue(procId == procId2);
+
+    ProcedureResult result = procExecutor.getResult(procId2);
+    ProcedureTestingUtility.assertProcNotFailed(result);
+  }
+
+  @Test(timeout=30000)
+  public void testRunningProcWithSameNonce() throws Exception {
+    final long nonceGroup = 456;
+    final long nonce = 33333;
+    Procedure proc = new TestMultiStepProcedure();
+    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce);
+
+    // Restart
+    restart();
+    Procedure proc2 = new TestMultiStepProcedure();
+    // Submit a procedure with the same nonce and expect the same procedure would return.
+    long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce);
+    // The original proc is not completed and the new submission should have the same proc Id.
+    assertTrue(procId == procId2);
+  }
+
   public static class TestStateMachineProcedure
       extends StateMachineProcedure<Void, TestStateMachineProcedure.State> {
     enum State { STATE_1, STATE_2, STATE_3, DONE }