You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by lj...@apache.org on 2019/09/17 11:19:34 UTC
[hadoop] branch trunk updated: HDDS-2117.
ContainerStateMachine#writeStateMachineData times out. (#1430)
This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7f90731 HDDS-2117. ContainerStateMachine#writeStateMachineData times out. (#1430)
7f90731 is described below
commit 7f9073132dcc9db157a6792635d2ed099f2ef0d2
Author: bshashikant <sh...@apache.org>
AuthorDate: Tue Sep 17 16:49:25 2019 +0530
HDDS-2117. ContainerStateMachine#writeStateMachineData times out. (#1430)
---
.../container/common/impl/HddsDispatcher.java | 9 ++++---
.../server/ratis/ContainerStateMachine.java | 31 ++++++++++++++++------
2 files changed, 29 insertions(+), 11 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index e95d899..37e19bc 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -236,9 +236,7 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
if (container2BCSIDMap != null) {
// adds this container to list of containers created in the pipeline
// with initial BCSID recorded as 0.
- Preconditions
- .checkArgument(!container2BCSIDMap.containsKey(containerID));
- container2BCSIDMap.put(containerID, Long.valueOf(0));
+ container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
}
container = getContainer(containerID);
}
@@ -290,6 +288,11 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
// state here.
Result result = responseProto.getResult();
+ if (cmdType == ContainerProtos.Type.CreateContainer
+ && result == Result.SUCCESS && dispatcherContext != null) {
+ Preconditions.checkNotNull(dispatcherContext.getContainer2BCSIDMap());
+ container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
+ }
if (!HddsUtils.isReadOnly(msg) && !canIgnoreException(result)) {
// If the container is open/closing and the container operation
// has failed, it should be first marked unhealthy and the initiate the
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index cee9741..c6ab0a1 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -435,13 +435,20 @@ public class ContainerStateMachine extends BaseStateMachine {
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
.setContainer2BCSIDMap(container2BCSIDMap)
.build();
+ CompletableFuture<Message> raftFuture = new CompletableFuture<>();
// ensure the write chunk happens asynchronously in writeChunkExecutor pool
// thread.
CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
- CompletableFuture.supplyAsync(() ->
- runCommand(requestProto, context), chunkExecutor);
-
- CompletableFuture<Message> raftFuture = new CompletableFuture<>();
+ CompletableFuture.supplyAsync(() -> {
+ try {
+ return runCommand(requestProto, context);
+ } catch (Exception e) {
+ LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId"
+ + write.getBlockID() + " logIndex " + entryIndex + " chunkName "
+ + write.getChunkData().getChunkName() + e);
+ raftFuture.completeExceptionally(e);
+ throw e;
+ }}, chunkExecutor);
writeChunkFutureMap.put(entryIndex, writeChunkFuture);
LOG.debug(gid + ": writeChunk writeStateMachineData : blockId " +
@@ -698,7 +705,7 @@ public class ContainerStateMachine extends BaseStateMachine {
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA);
}
if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile
- || cmdType == Type.PutBlock) {
+ || cmdType == Type.PutBlock || cmdType == Type.CreateContainer) {
builder.setContainer2BCSIDMap(container2BCSIDMap);
}
CompletableFuture<Message> applyTransactionFuture =
@@ -706,9 +713,17 @@ public class ContainerStateMachine extends BaseStateMachine {
// Ensure the command gets executed in a separate thread than
// stateMachineUpdater thread which is calling applyTransaction here.
CompletableFuture<ContainerCommandResponseProto> future =
- CompletableFuture.supplyAsync(
- () -> runCommand(requestProto, builder.build()),
- getCommandExecutor(requestProto));
+ CompletableFuture.supplyAsync(() -> {
+ try {
+ return runCommand(requestProto, builder.build());
+ } catch (Exception e) {
+ LOG.error("gid {} : ApplyTransaction failed. cmd {} logIndex "
+ + "{} exception {}", gid, requestProto.getCmdType(),
+ index, e);
+ applyTransactionFuture.completeExceptionally(e);
+ throw e;
+ }
+ }, getCommandExecutor(requestProto));
future.thenApply(r -> {
if (trx.getServerRole() == RaftPeerRole.LEADER) {
long startTime = (long) trx.getStateMachineContext();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org