You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/03/07 08:59:52 UTC
[hadoop] branch ozone-0.4 updated: HDDS-1184. Parallelization of
write chunks in datanodes is broken. Contributed by Shashikant Banerjee.
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch ozone-0.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/ozone-0.4 by this push:
new 8655abb HDDS-1184. Parallelization of write chunks in datanodes is broken. Contributed by Shashikant Banerjee.
8655abb is described below
commit 8655abb353811eba349dd0703d702deb02532242
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Wed Mar 6 10:00:16 2019 +0530
HDDS-1184. Parallelization of write chunks in datanodes is broken. Contributed by Shashikant Banerjee.
(cherry picked from commit 62e89dc275f120f54967744393e2ddde15575096)
---
.../src/main/proto/DatanodeContainerProtocol.proto | 1 +
.../server/ratis/ContainerStateMachine.java | 60 ++++++++++++----------
2 files changed, 33 insertions(+), 28 deletions(-)
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index 3b78835..7396eb3 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -143,6 +143,7 @@ enum Result {
BCSID_MISMATCH = 38;
CONTAINER_NOT_OPEN = 39;
CONTAINER_MISSING = 40;
+ BLOCK_TOKEN_VERIFICATION_FAILED = 41;
}
/**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 759f957..ed7e099 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import io.opentracing.Scope;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
@@ -350,13 +352,20 @@ public class ContainerStateMachine extends BaseStateMachine {
}
private ContainerCommandResponseProto dispatchCommand(
- ContainerCommandRequestProto requestProto,
- DispatcherContext context) throws IOException {
+ ContainerCommandRequestProto requestProto, DispatcherContext context) {
LOG.trace("dispatch {}", requestProto);
- if(isBlockTokenEnabled) {
- // ServerInterceptors intercepts incoming request and creates ugi.
- tokenVerifier.verify(UserGroupInformation.getCurrentUser()
- .getShortUserName(), requestProto.getEncodedToken());
+ if (isBlockTokenEnabled) {
+ try {
+ // ServerInterceptors intercepts incoming request and creates ugi.
+ tokenVerifier
+ .verify(UserGroupInformation.getCurrentUser().getShortUserName(),
+ requestProto.getEncodedToken());
+ } catch (IOException ioe) {
+ StorageContainerException sce = new StorageContainerException(
+ "Block token verification failed. " + ioe.getMessage(), ioe,
+ ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED);
+ return ContainerUtils.logAndReturnError(LOG, sce, requestProto);
+ }
}
ContainerCommandResponseProto response =
dispatcher.dispatch(requestProto, context);
@@ -365,7 +374,7 @@ public class ContainerStateMachine extends BaseStateMachine {
}
private Message runCommand(ContainerCommandRequestProto requestProto,
- DispatcherContext context) throws IOException {
+ DispatcherContext context) {
return dispatchCommand(requestProto, context)::toByteString;
}
@@ -394,14 +403,10 @@ public class ContainerStateMachine extends BaseStateMachine {
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
.setCreateContainerSet(createContainerSet)
.build();
- CompletableFuture<Message> writeChunkFuture;
- try {
- Message msg = runCommand(requestProto, context);
- writeChunkFuture = CompletableFuture
- .supplyAsync(() -> msg, chunkExecutor);
- }catch(IOException ie) {
- writeChunkFuture = completeExceptionally(ie);
- }
+ // ensure the write chunk happens asynchronously in writeChunkExecutor pool
+ // thread.
+ CompletableFuture<Message> writeChunkFuture = CompletableFuture
+ .supplyAsync(() -> runCommand(requestProto, context), chunkExecutor);
writeChunkFutureMap.put(entryIndex, writeChunkFuture);
LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID()
@@ -567,16 +572,18 @@ public class ContainerStateMachine extends BaseStateMachine {
// readStateMachineData should only be called for "write" to Ratis.
Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto));
if (requestProto.getCmdType() == Type.WriteChunk) {
- CompletableFuture<ByteString> future = new CompletableFuture<>();
- return future.supplyAsync(() -> {
+ final CompletableFuture<ByteString> future = new CompletableFuture<>();
+ CompletableFuture.supplyAsync(() -> {
try {
- return getCachedStateMachineData(entry.getIndex(), entry.getTerm(),
- requestProto);
+ future.complete(
+ getCachedStateMachineData(entry.getIndex(), entry.getTerm(),
+ requestProto));
} catch (ExecutionException e) {
future.completeExceptionally(e);
- return null;
}
+ return future;
}, chunkExecutor);
+ return future;
} else {
throw new IllegalStateException("Cmd type:" + requestProto.getCmdType()
+ " cannot have state machine data");
@@ -627,7 +634,6 @@ public class ContainerStateMachine extends BaseStateMachine {
ContainerCommandRequestProto requestProto =
getRequestProto(trx.getStateMachineLogEntry().getLogData());
Type cmdType = requestProto.getCmdType();
- CompletableFuture<Message> future;
// Make sure that in write chunk, the user data is not set
if (cmdType == Type.WriteChunk) {
Preconditions
@@ -638,13 +644,11 @@ public class ContainerStateMachine extends BaseStateMachine {
if (cmdType == Type.WriteChunk || cmdType ==Type.PutSmallFile) {
builder.setCreateContainerSet(createContainerSet);
}
- try {
- Message msg = runCommand(requestProto, builder.build());
- future = CompletableFuture.supplyAsync(() -> msg,
- getCommandExecutor(requestProto));
- } catch (IOException ie) {
- future = completeExceptionally(ie);
- }
+ // Ensure the command gets executed in a separate thread than
+ // stateMachineUpdater thread which is calling applyTransaction here.
+ CompletableFuture<Message> future = CompletableFuture
+ .supplyAsync(() -> runCommand(requestProto, builder.build()),
+ getCommandExecutor(requestProto));
lastIndex = index;
future.thenAccept(m -> {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org