You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/01/06 16:26:18 UTC
[04/12] hbase git commit: HBASE-13415 Procedure V2 - Use nonces for
double submits from client (Stephen Yuan Jiang)
HBASE-13415 Procedure V2 - Use nonces for double submits from client (Stephen Yuan Jiang)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0c900fe7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0c900fe7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0c900fe7
Branch: refs/heads/branch-1.1
Commit: 0c900fe7eebf5d5253c4f2bf69f04127dcf0c80a
Parents: bab0f0f
Author: Stephen Yuan Jiang <sy...@gmail.com>
Authored: Tue Jan 5 23:55:16 2016 -0800
Committer: Stephen Yuan Jiang <sy...@gmail.com>
Committed: Tue Jan 5 23:55:16 2016 -0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/client/HBaseAdmin.java | 30 +-
.../hadoop/hbase/protobuf/RequestConverter.java | 69 +-
.../org/apache/hadoop/hbase/util/NonceKey.java | 65 +
.../hadoop/hbase/procedure2/Procedure.java | 29 +-
.../hbase/procedure2/ProcedureExecutor.java | 92 +-
.../hbase/procedure2/ProcedureResult.java | 34 +-
.../procedure2/ProcedureTestingUtility.java | 9 +-
.../hbase/procedure2/TestProcedureRecovery.java | 39 +-
.../hbase/protobuf/generated/MasterProtos.java | 2091 ++++++++++++++++--
.../protobuf/generated/ProcedureProtos.java | 257 ++-
hbase-protocol/src/main/protobuf/Master.proto | 18 +
.../src/main/protobuf/Procedure.proto | 4 +
.../org/apache/hadoop/hbase/master/HMaster.java | 114 +-
.../hadoop/hbase/master/MasterRpcServices.java | 51 +-
.../hadoop/hbase/master/MasterServices.java | 70 +-
.../hbase/regionserver/ServerNonceManager.java | 33 +-
.../security/access/AccessControlLists.java | 4 +-
.../visibility/VisibilityController.java | 2 +-
.../hbase/client/TestHBaseAdminNoCluster.java | 5 +
.../master/TestAssignmentManagerOnCluster.java | 2 +-
.../hadoop/hbase/master/TestCatalogJanitor.java | 62 +-
.../hadoop/hbase/master/TestProcedureConf.java | 7 +-
.../MasterProcedureTestingUtility.java | 9 +
.../procedure/TestAddColumnFamilyProcedure.java | 94 +-
.../procedure/TestCreateTableProcedure.java | 46 +-
.../TestDeleteColumnFamilyProcedure.java | 102 +-
.../procedure/TestDeleteTableProcedure.java | 48 +-
.../procedure/TestDisableTableProcedure.java | 44 +-
.../procedure/TestEnableTableProcedure.java | 47 +-
.../TestModifyColumnFamilyProcedure.java | 45 +-
.../procedure/TestModifyTableProcedure.java | 27 +-
.../procedure/TestTruncateTableProcedure.java | 15 +-
32 files changed, 3049 insertions(+), 515 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/0c900fe7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 5a0def3..38a980b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -206,6 +206,8 @@ public class HBaseAdmin implements Admin {
private RpcRetryingCallerFactory rpcCallerFactory;
+ private NonceGenerator ng;
+
/**
* Constructor.
* See {@link #HBaseAdmin(Connection connection)}
@@ -261,6 +263,8 @@ public class HBaseAdmin implements Admin {
"hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
+
+ this.ng = this.connection.getNonceGenerator();
}
@Override
@@ -637,7 +641,8 @@ public class HBaseAdmin implements Admin {
new MasterCallable<CreateTableResponse>(getConnection()) {
@Override
public CreateTableResponse call(int callTimeout) throws ServiceException {
- CreateTableRequest request = RequestConverter.buildCreateTableRequest(desc, splitKeys);
+ CreateTableRequest request = RequestConverter.buildCreateTableRequest(
+ desc, splitKeys, ng.getNonceGroup(), ng.newNonce());
return master.createTable(null, request);
}
});
@@ -807,7 +812,8 @@ public class HBaseAdmin implements Admin {
new MasterCallable<DeleteTableResponse>(getConnection()) {
@Override
public DeleteTableResponse call(int callTimeout) throws ServiceException {
- DeleteTableRequest req = RequestConverter.buildDeleteTableRequest(tableName);
+ DeleteTableRequest req =
+ RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce());
return master.deleteTable(null,req);
}
});
@@ -919,7 +925,7 @@ public class HBaseAdmin implements Admin {
@Override
public Void call(int callTimeout) throws ServiceException {
TruncateTableRequest req = RequestConverter.buildTruncateTableRequest(
- tableName, preserveSplits);
+ tableName, preserveSplits, ng.getNonceGroup(), ng.newNonce());
master.truncateTable(null, req);
return null;
}
@@ -1055,7 +1061,8 @@ public class HBaseAdmin implements Admin {
@Override
public EnableTableResponse call(int callTimeout) throws ServiceException {
LOG.info("Started enable of " + tableName);
- EnableTableRequest req = RequestConverter.buildEnableTableRequest(tableName);
+ EnableTableRequest req =
+ RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce());
return master.enableTable(null,req);
}
});
@@ -1242,7 +1249,8 @@ public class HBaseAdmin implements Admin {
@Override
public DisableTableResponse call(int callTimeout) throws ServiceException {
LOG.info("Started disable of " + tableName);
- DisableTableRequest req = RequestConverter.buildDisableTableRequest(tableName);
+ DisableTableRequest req =
+ RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce());
return master.disableTable(null, req);
}
});
@@ -1521,7 +1529,8 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws ServiceException {
- AddColumnRequest req = RequestConverter.buildAddColumnRequest(tableName, column);
+ AddColumnRequest req = RequestConverter.buildAddColumnRequest(
+ tableName, column, ng.getNonceGroup(), ng.newNonce());
master.addColumn(null,req);
return null;
}
@@ -1568,7 +1577,8 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws ServiceException {
- DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest(tableName, columnName);
+ DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest(
+ tableName, columnName, ng.getNonceGroup(), ng.newNonce());
master.deleteColumn(null,req);
return null;
}
@@ -1617,7 +1627,8 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws ServiceException {
- ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest(tableName, descriptor);
+ ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest(
+ tableName, descriptor, ng.getNonceGroup(), ng.newNonce());
master.modifyColumn(null,req);
return null;
}
@@ -2452,7 +2463,8 @@ public class HBaseAdmin implements Admin {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
public Void call(int callTimeout) throws ServiceException {
- ModifyTableRequest request = RequestConverter.buildModifyTableRequest(tableName, htd);
+ ModifyTableRequest request = RequestConverter.buildModifyTableRequest(
+ tableName, htd, ng.getNonceGroup(), ng.newNonce());
master.modifyTable(null, request);
return null;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0c900fe7/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
index 94fb892..f2fc545 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
@@ -1060,10 +1060,15 @@ public final class RequestConverter {
* @return an AddColumnRequest
*/
public static AddColumnRequest buildAddColumnRequest(
- final TableName tableName, final HColumnDescriptor column) {
+ final TableName tableName,
+ final HColumnDescriptor column,
+ final long nonceGroup,
+ final long nonce) {
AddColumnRequest.Builder builder = AddColumnRequest.newBuilder();
builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
builder.setColumnFamilies(column.convert());
+ builder.setNonceGroup(nonceGroup);
+ builder.setNonce(nonce);
return builder.build();
}
@@ -1075,10 +1080,15 @@ public final class RequestConverter {
* @return a DeleteColumnRequest
*/
public static DeleteColumnRequest buildDeleteColumnRequest(
- final TableName tableName, final byte [] columnName) {
+ final TableName tableName,
+ final byte [] columnName,
+ final long nonceGroup,
+ final long nonce) {
DeleteColumnRequest.Builder builder = DeleteColumnRequest.newBuilder();
builder.setTableName(ProtobufUtil.toProtoTableName((tableName)));
builder.setColumnName(ByteStringer.wrap(columnName));
+ builder.setNonceGroup(nonceGroup);
+ builder.setNonce(nonce);
return builder.build();
}
@@ -1090,10 +1100,15 @@ public final class RequestConverter {
* @return an ModifyColumnRequest
*/
public static ModifyColumnRequest buildModifyColumnRequest(
- final TableName tableName, final HColumnDescriptor column) {
+ final TableName tableName,
+ final HColumnDescriptor column,
+ final long nonceGroup,
+ final long nonce) {
ModifyColumnRequest.Builder builder = ModifyColumnRequest.newBuilder();
builder.setTableName(ProtobufUtil.toProtoTableName((tableName)));
builder.setColumnFamilies(column.convert());
+ builder.setNonceGroup(nonceGroup);
+ builder.setNonce(nonce);
return builder.build();
}
@@ -1175,9 +1190,14 @@ public final class RequestConverter {
* @param tableName
* @return a DeleteTableRequest
*/
- public static DeleteTableRequest buildDeleteTableRequest(final TableName tableName) {
+ public static DeleteTableRequest buildDeleteTableRequest(
+ final TableName tableName,
+ final long nonceGroup,
+ final long nonce) {
DeleteTableRequest.Builder builder = DeleteTableRequest.newBuilder();
builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
+ builder.setNonceGroup(nonceGroup);
+ builder.setNonce(nonce);
return builder.build();
}
@@ -1188,11 +1208,16 @@ public final class RequestConverter {
* @param preserveSplits True if the splits should be preserved
* @return a TruncateTableRequest
*/
- public static TruncateTableRequest buildTruncateTableRequest(final TableName tableName,
- boolean preserveSplits) {
+ public static TruncateTableRequest buildTruncateTableRequest(
+ final TableName tableName,
+ final boolean preserveSplits,
+ final long nonceGroup,
+ final long nonce) {
TruncateTableRequest.Builder builder = TruncateTableRequest.newBuilder();
builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
builder.setPreserveSplits(preserveSplits);
+ builder.setNonceGroup(nonceGroup);
+ builder.setNonce(nonce);
return builder.build();
}
@@ -1202,9 +1227,14 @@ public final class RequestConverter {
* @param tableName
* @return an EnableTableRequest
*/
- public static EnableTableRequest buildEnableTableRequest(final TableName tableName) {
+ public static EnableTableRequest buildEnableTableRequest(
+ final TableName tableName,
+ final long nonceGroup,
+ final long nonce) {
EnableTableRequest.Builder builder = EnableTableRequest.newBuilder();
builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
+ builder.setNonceGroup(nonceGroup);
+ builder.setNonce(nonce);
return builder.build();
}
@@ -1214,9 +1244,14 @@ public final class RequestConverter {
* @param tableName
* @return a DisableTableRequest
*/
- public static DisableTableRequest buildDisableTableRequest(final TableName tableName) {
+ public static DisableTableRequest buildDisableTableRequest(
+ final TableName tableName,
+ final long nonceGroup,
+ final long nonce) {
DisableTableRequest.Builder builder = DisableTableRequest.newBuilder();
builder.setTableName(ProtobufUtil.toProtoTableName((tableName)));
+ builder.setNonceGroup(nonceGroup);
+ builder.setNonce(nonce);
return builder.build();
}
@@ -1228,7 +1263,10 @@ public final class RequestConverter {
* @return a CreateTableRequest
*/
public static CreateTableRequest buildCreateTableRequest(
- final HTableDescriptor hTableDesc, final byte [][] splitKeys) {
+ final HTableDescriptor hTableDesc,
+ final byte [][] splitKeys,
+ final long nonceGroup,
+ final long nonce) {
CreateTableRequest.Builder builder = CreateTableRequest.newBuilder();
builder.setTableSchema(hTableDesc.convert());
if (splitKeys != null) {
@@ -1236,6 +1274,8 @@ public final class RequestConverter {
builder.addSplitKeys(ByteStringer.wrap(splitKey));
}
}
+ builder.setNonceGroup(nonceGroup);
+ builder.setNonce(nonce);
return builder.build();
}
@@ -1248,10 +1288,15 @@ public final class RequestConverter {
* @return a ModifyTableRequest
*/
public static ModifyTableRequest buildModifyTableRequest(
- final TableName tableName, final HTableDescriptor hTableDesc) {
+ final TableName tableName,
+ final HTableDescriptor hTableDesc,
+ final long nonceGroup,
+ final long nonce) {
ModifyTableRequest.Builder builder = ModifyTableRequest.newBuilder();
builder.setTableName(ProtobufUtil.toProtoTableName((tableName)));
builder.setTableSchema(hTableDesc.convert());
+ builder.setNonceGroup(nonceGroup);
+ builder.setNonce(nonce);
return builder.build();
}
@@ -1353,7 +1398,9 @@ public final class RequestConverter {
* @param synchronous
* @return a SetBalancerRunningRequest
*/
- public static SetBalancerRunningRequest buildSetBalancerRunningRequest(boolean on, boolean synchronous) {
+ public static SetBalancerRunningRequest buildSetBalancerRunningRequest(
+ boolean on,
+ boolean synchronous) {
return SetBalancerRunningRequest.newBuilder().setOn(on).setSynchronous(synchronous).build();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0c900fe7/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java
new file mode 100644
index 0000000..9c7c72a
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java
@@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+ /**
+ * This implementation is not smart and just treats nonce group and nonce as random bits.
+ */
+ // TODO: we could use pure byte arrays, but then we wouldn't be able to use hash map.
+@InterfaceAudience.Private
+public class NonceKey {
+ private long group;
+ private long nonce;
+
+ public NonceKey(long group, long nonce) {
+ assert nonce != HConstants.NO_NONCE;
+ this.group = group;
+ this.nonce = nonce;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || !(obj instanceof NonceKey)) {
+ return false;
+ }
+ NonceKey nk = ((NonceKey)obj);
+ return this.nonce == nk.nonce && this.group == nk.group;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int)((group >> 32) ^ group ^ (nonce >> 32) ^ nonce);
+ }
+
+ @Override
+ public String toString() {
+ return "[" + group + ":" + nonce + "]";
+ }
+
+ public long getNonceGroup() {
+ return group;
+ }
+
+ public long getNonce() {
+ return nonce;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0c900fe7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index 338fcad..e5da8b4 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.NonceKey;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -78,6 +80,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
private RemoteProcedureException exception = null;
private byte[] result = null;
+ private NonceKey nonceKey = null;
+
/**
* The main code of the procedure. It must be idempotent since execute()
* may be called multiple time in case of machine failure in the middle
@@ -237,6 +241,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
return parentProcId;
}
+ public NonceKey getNonceKey() {
+ return nonceKey;
+ }
+
/**
* @return true if the procedure has failed.
* true may mean failed but not yet rolledback or failed and rolledback.
@@ -389,6 +397,15 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
}
/**
+ * Called by the ProcedureExecutor to set the value to the newly created procedure.
+ */
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ protected void setNonceKey(final NonceKey nonceKey) {
+ this.nonceKey = nonceKey;
+ }
+
+ /**
* Internal method called by the ProcedureExecutor that starts the
* user-level code execute().
*/
@@ -621,6 +638,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
builder.setStateData(stateStream.toByteString());
}
+ if (proc.getNonceKey() != null) {
+ builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
+ builder.setNonce(proc.getNonceKey().getNonce());
+ }
+
return builder.build();
}
@@ -672,9 +694,14 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
proc.setResult(proto.getResult().toByteArray());
}
+ if (proto.getNonce() != HConstants.NO_NONCE) {
+ NonceKey nonceKey = new NonceKey(proto.getNonceGroup(), proto.getNonce());
+ proc.setNonceKey(nonceKey);
+ }
+
// we want to call deserialize even when the stream is empty, mainly for testing.
proc.deserializeStateData(proto.getStateData().newInput());
return proc;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0c900fe7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index d615429..13f6b1a 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
@@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.hadoop.hbase.util.Pair;
import com.google.common.base.Preconditions;
@@ -134,14 +136,17 @@ public class ProcedureExecutor<TEnvironment> {
private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
private final Map<Long, ProcedureResult> completed;
+ private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
private final ProcedureStore store;
private final Configuration conf;
public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store,
- final Map<Long, ProcedureResult> completedMap) {
+ final Map<Long, ProcedureResult> completedMap,
+ final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
// set the timeout interval that triggers the periodic-procedure
setTimeout(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
this.completed = completedMap;
+ this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
this.store = store;
this.conf = conf;
}
@@ -171,6 +176,11 @@ public class ProcedureExecutor<TEnvironment> {
}
store.delete(entry.getKey());
it.remove();
+
+ NonceKey nonceKey = result.getNonceKey();
+ if (nonceKey != null) {
+ nonceKeysToProcIdsMap.remove(nonceKey);
+ }
}
}
}
@@ -225,6 +235,13 @@ public class ProcedureExecutor<TEnvironment> {
new ConcurrentHashMap<Long, Procedure>();
/**
+ * Helper map to lookup whether the procedure already issued from the same client.
+ * This map contains every root procedure.
+ */
+ private ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap =
+ new ConcurrentHashMap<NonceKey, Long>();
+
+ /**
* Timeout Queue that contains Procedures in a WAITING_TIMEOUT state
* or periodic procedures.
*/
@@ -292,6 +309,12 @@ public class ProcedureExecutor<TEnvironment> {
if (!proc.hasParent() && !proc.isFinished()) {
rollbackStack.put(proc.getProcId(), new RootProcedureState());
}
+
+ // add the nonce to the map
+ if (proc.getNonceKey() != null) {
+ nonceKeysToProcIdsMap.put(proc.getNonceKey(), proc.getProcId());
+ }
+
if (proc.getState() == ProcedureState.RUNNABLE) {
runnablesCount++;
}
@@ -317,6 +340,7 @@ public class ProcedureExecutor<TEnvironment> {
}
assert !rollbackStack.containsKey(proc.getProcId());
completed.put(proc.getProcId(), newResultFromProcedure(proc));
+
continue;
}
@@ -439,7 +463,8 @@ public class ProcedureExecutor<TEnvironment> {
}
// Add completed cleaner
- waitingTimeout.add(new CompletedProcedureCleaner(conf, store, completed));
+ waitingTimeout.add(
+ new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap));
}
public void stop() {
@@ -470,6 +495,7 @@ public class ProcedureExecutor<TEnvironment> {
completed.clear();
rollbackStack.clear();
procedures.clear();
+ nonceKeysToProcIdsMap.clear();
waitingTimeout.clear();
runnables.clear();
lastProcId.set(-1);
@@ -512,13 +538,53 @@ public class ProcedureExecutor<TEnvironment> {
* @return the procedure id, that can be used to monitor the operation
*/
public long submitProcedure(final Procedure proc) {
+ return submitProcedure(proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ }
+
+ /**
+ * Add a new root-procedure to the executor.
+ * @param proc the new procedure to execute.
+ * @param nonceGroup
+ * @param nonce
+ * @return the procedure id, that can be used to monitor the operation
+ */
+ public long submitProcedure(
+ final Procedure proc,
+ final long nonceGroup,
+ final long nonce) {
Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
Preconditions.checkArgument(isRunning());
Preconditions.checkArgument(lastProcId.get() >= 0);
Preconditions.checkArgument(!proc.hasParent());
- // Initialize the Procedure ID
- proc.setProcId(nextProcId());
+ Long currentProcId;
+
+ // The following part of the code has to be synchronized to prevent multiple request
+ // with the same nonce to execute at the same time.
+ synchronized (this) {
+ // Check whether the proc exists. If exist, just return the proc id.
+ // This is to prevent the same proc to submit multiple times (it could happen
+ // when client could not talk to server and resubmit the same request).
+ NonceKey noncekey = null;
+ if (nonce != HConstants.NO_NONCE) {
+ noncekey = new NonceKey(nonceGroup, nonce);
+ currentProcId = nonceKeysToProcIdsMap.get(noncekey);
+ if (currentProcId != null) {
+ // Found the proc
+ return currentProcId;
+ }
+ }
+
+ // Initialize the Procedure ID
+ currentProcId = nextProcId();
+ proc.setProcId(currentProcId);
+
+ // This is new procedure. Set the noncekey and insert into the map.
+ if (noncekey != null) {
+ proc.setNonceKey(noncekey);
+ nonceKeysToProcIdsMap.put(noncekey, currentProcId);
+ }
+ } // end of synchronized (this)
// Commit the transaction
store.insert(proc, null);
@@ -528,14 +594,14 @@ public class ProcedureExecutor<TEnvironment> {
// Create the rollback stack for the procedure
RootProcedureState stack = new RootProcedureState();
- rollbackStack.put(proc.getProcId(), stack);
+ rollbackStack.put(currentProcId, stack);
// Submit the new subprocedures
- assert !procedures.containsKey(proc.getProcId());
- procedures.put(proc.getProcId(), proc);
- sendProcedureAddedNotification(proc.getProcId());
+ assert !procedures.containsKey(currentProcId);
+ procedures.put(currentProcId, proc);
+ sendProcedureAddedNotification(currentProcId);
runnables.addBack(proc);
- return proc.getProcId();
+ return currentProcId;
}
public ProcedureResult getResult(final long procId) {
@@ -1090,8 +1156,10 @@ public class ProcedureExecutor<TEnvironment> {
private static ProcedureResult newResultFromProcedure(final Procedure proc) {
if (proc.isFailed()) {
- return new ProcedureResult(proc.getStartTime(), proc.getLastUpdate(), proc.getException());
+ return new ProcedureResult(
+ proc.getNonceKey(), proc.getStartTime(), proc.getLastUpdate(), proc.getException());
}
- return new ProcedureResult(proc.getStartTime(), proc.getLastUpdate(), proc.getResult());
+ return new ProcedureResult(
+ proc.getNonceKey(), proc.getStartTime(), proc.getLastUpdate(), proc.getResult());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0c900fe7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java
index 98c293b..ff5407f 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.procedure2;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.NonceKey;
/**
* Once a Procedure completes the ProcedureExecutor takes all the useful
@@ -30,6 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ProcedureResult {
+ private final NonceKey nonceKey;
private final RemoteProcedureException exception;
private final long lastUpdate;
private final long startTime;
@@ -37,21 +39,39 @@ public class ProcedureResult {
private long clientAckTime = -1;
- public ProcedureResult(final long startTime, final long lastUpdate,
+ public ProcedureResult(
+ final NonceKey nonceKey,
+ final long startTime,
+ final long lastUpdate,
final RemoteProcedureException exception) {
- this.lastUpdate = lastUpdate;
- this.startTime = startTime;
- this.exception = exception;
- this.result = null;
+ this(nonceKey, exception, lastUpdate, startTime, null);
+ }
+
+ public ProcedureResult(
+ final NonceKey nonceKey,
+ final long startTime,
+ final long lastUpdate,
+ final byte[] result) {
+ this(nonceKey, null, lastUpdate, startTime, result);
}
- public ProcedureResult(final long startTime, final long lastUpdate, final byte[] result) {
+ public ProcedureResult(
+ final NonceKey nonceKey,
+ final RemoteProcedureException exception,
+ final long lastUpdate,
+ final long startTime,
+ final byte[] result) {
+ this.nonceKey = nonceKey;
+ this.exception = exception;
this.lastUpdate = lastUpdate;
this.startTime = startTime;
- this.exception = null;
this.result = result;
}
+ public NonceKey getNonceKey() {
+ return nonceKey;
+ }
+
public boolean isFailed() {
return exception != null;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0c900fe7/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index da590b8..f172834 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
@@ -112,7 +113,13 @@ public class ProcedureTestingUtility {
}
public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
- long procId = procExecutor.submitProcedure(proc);
+ return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ }
+
+ public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
+ final long nonceGroup,
+ final long nonce) {
+ long procId = procExecutor.submitProcedure(proc, nonceGroup, nonce);
waitProcedure(procExecutor, procId);
return procId;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0c900fe7/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
index f21b6fa..e69faf5 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Assert;
@@ -76,6 +75,9 @@ public class TestProcedureRecovery {
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS);
procSleepInterval = 0;
+
+ ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, false);
+ ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, false);
}
@After
@@ -285,6 +287,41 @@ public class TestProcedureRecovery {
ProcedureTestingUtility.assertIsAbortException(result);
}
+ @Test(timeout=30000)
+ public void testCompletedProcWithSameNonce() throws Exception {
+ final long nonceGroup = 123;
+ final long nonce = 2222;
+ Procedure proc = new TestSingleStepProcedure();
+ // Submit a proc and wait for its completion
+ long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce);
+
+ // Restart
+ restart();
+ Procedure proc2 = new TestSingleStepProcedure();
+ // Submit a procedure with the same nonce and expect the same procedure would return.
+ long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce);
+ assertTrue(procId == procId2);
+
+ ProcedureResult result = procExecutor.getResult(procId2);
+ ProcedureTestingUtility.assertProcNotFailed(result);
+ }
+
+ @Test(timeout=30000)
+ public void testRunningProcWithSameNonce() throws Exception {
+ final long nonceGroup = 456;
+ final long nonce = 33333;
+ Procedure proc = new TestMultiStepProcedure();
+ long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce);
+
+ // Restart
+ restart();
+ Procedure proc2 = new TestMultiStepProcedure();
+ // Submit a procedure with the same nonce and expect the same procedure would return.
+ long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce);
+ // The original proc is not completed and the new submission should have the same proc Id.
+ assertTrue(procId == procId2);
+ }
+
public static class TestStateMachineProcedure
extends StateMachineProcedure<Void, TestStateMachineProcedure.State> {
enum State { STATE_1, STATE_2, STATE_3, DONE }