You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ms...@apache.org on 2018/08/16 18:06:34 UTC
hadoop git commit: HDDS-179. CloseContainer/PutKey command should be
syncronized with write operations. Contributed by Shashikant Banerjee.
Repository: hadoop
Updated Branches:
refs/heads/trunk 0e832e7a7 -> 5ef29087a
HDDS-179. CloseContainer/PutKey command should be syncronized with write operations. Contributed by Shashikant Banerjee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5ef29087
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5ef29087
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5ef29087
Branch: refs/heads/trunk
Commit: 5ef29087ad27f4f6b815dbc08ea7427d14df58e1
Parents: 0e832e7
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Thu Aug 16 23:35:19 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Thu Aug 16 23:35:19 2018 +0530
----------------------------------------------------------------------
.../server/ratis/ContainerStateMachine.java | 323 +++++++++++++++----
.../server/TestContainerStateMachine.java | 201 ++++++++++++
2 files changed, 467 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ef29087/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 15e991a..52ea3aa 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.ratis.protocol.RaftGroupId;
@@ -52,6 +53,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
@@ -95,6 +99,13 @@ import java.util.concurrent.ThreadPoolExecutor;
* {@link #applyTransaction} need to be enforced in the StateMachine
* implementation. For example, synchronization between writeChunk and
* createContainer in {@link ContainerStateMachine}.
+ *
+ * PutKey is synchronized with WriteChunk operations, PutKey for a block is
+ * executed only after all the WriteChunk preceding the PutKey have finished.
+ *
+ * CloseContainer is synchronized with WriteChunk and PutKey operations,
+ * CloseContainer for a container is processed after all the preceding write
+ * operations for the container have finished.
* */
public class ContainerStateMachine extends BaseStateMachine {
static final Logger LOG = LoggerFactory.getLogger(
@@ -105,15 +116,14 @@ public class ContainerStateMachine extends BaseStateMachine {
private ThreadPoolExecutor chunkExecutor;
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
writeChunkFutureMap;
- private final ConcurrentHashMap<Long, CompletableFuture<Message>>
- createContainerFutureMap;
+ private final ConcurrentHashMap<Long, StateMachineHelper> stateMachineMap;
- ContainerStateMachine(ContainerDispatcher dispatcher,
+ public ContainerStateMachine(ContainerDispatcher dispatcher,
ThreadPoolExecutor chunkExecutor) {
this.dispatcher = dispatcher;
this.chunkExecutor = chunkExecutor;
this.writeChunkFutureMap = new ConcurrentHashMap<>();
- this.createContainerFutureMap = new ConcurrentHashMap<>();
+ this.stateMachineMap = new ConcurrentHashMap<>();
}
@Override
@@ -203,32 +213,6 @@ public class ContainerStateMachine extends BaseStateMachine {
return dispatchCommand(requestProto)::toByteString;
}
- private CompletableFuture<Message> handleWriteChunk(
- ContainerCommandRequestProto requestProto, long entryIndex) {
- final WriteChunkRequestProto write = requestProto.getWriteChunk();
- long containerID = write.getBlockID().getContainerID();
- CompletableFuture<Message> future =
- createContainerFutureMap.get(containerID);
- CompletableFuture<Message> writeChunkFuture;
- if (future != null) {
- writeChunkFuture = future.thenApplyAsync(
- v -> runCommand(requestProto), chunkExecutor);
- } else {
- writeChunkFuture = CompletableFuture.supplyAsync(
- () -> runCommand(requestProto), chunkExecutor);
- }
- writeChunkFutureMap.put(entryIndex, writeChunkFuture);
- return writeChunkFuture;
- }
-
- private CompletableFuture<Message> handleCreateContainer(
- ContainerCommandRequestProto requestProto) {
- long containerID = requestProto.getContainerID();
- createContainerFutureMap.
- computeIfAbsent(containerID, k -> new CompletableFuture<>());
- return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
- }
-
/*
* writeStateMachineData calls are not synchronized with each other
* and also with applyTransaction.
@@ -239,15 +223,17 @@ public class ContainerStateMachine extends BaseStateMachine {
final ContainerCommandRequestProto requestProto =
getRequestProto(entry.getSmLogEntry().getStateMachineData());
Type cmdType = requestProto.getCmdType();
- switch (cmdType) {
- case CreateContainer:
- return handleCreateContainer(requestProto);
- case WriteChunk:
- return handleWriteChunk(requestProto, entry.getIndex());
- default:
- throw new IllegalStateException("Cmd Type:" + cmdType
- + " should not have state machine data");
+ long containerId = requestProto.getContainerID();
+ stateMachineMap
+ .computeIfAbsent(containerId, k -> new StateMachineHelper());
+ CompletableFuture<Message> stateMachineFuture =
+ stateMachineMap.get(containerId)
+ .handleStateMachineData(requestProto, entry.getIndex());
+ if (stateMachineFuture == null) {
+ throw new IllegalStateException(
+ "Cmd Type:" + cmdType + " should not have state machine data");
}
+ return stateMachineFuture;
} catch (IOException e) {
return completeExceptionally(e);
}
@@ -363,25 +349,13 @@ public class ContainerStateMachine extends BaseStateMachine {
try {
ContainerCommandRequestProto requestProto =
getRequestProto(trx.getSMLogEntry().getData());
- Type cmdType = requestProto.getCmdType();
-
- if (cmdType == Type.WriteChunk) {
- WriteChunkRequestProto write = requestProto.getWriteChunk();
- // the data field has already been removed in start Transaction
- Preconditions.checkArgument(!write.hasData());
- CompletableFuture<Message> stateMachineFuture =
- writeChunkFutureMap.remove(trx.getLogEntry().getIndex());
- return stateMachineFuture
- .thenComposeAsync(v ->
- CompletableFuture.completedFuture(runCommand(requestProto)));
- } else {
- Message message = runCommand(requestProto);
- if (cmdType == Type.CreateContainer) {
- long containerID = requestProto.getContainerID();
- createContainerFutureMap.remove(containerID).complete(message);
- }
- return CompletableFuture.completedFuture(message);
- }
+ Preconditions.checkState(!HddsUtils.isReadOnly(requestProto));
+ stateMachineMap.computeIfAbsent(requestProto.getContainerID(),
+ k -> new StateMachineHelper());
+ long index =
+ trx.getLogEntry() == null ? -1 : trx.getLogEntry().getIndex();
+ return stateMachineMap.get(requestProto.getContainerID())
+ .executeContainerCommand(requestProto, index);
} catch (IOException e) {
return completeExceptionally(e);
}
@@ -396,4 +370,239 @@ public class ContainerStateMachine extends BaseStateMachine {
@Override
public void close() throws IOException {
}
+
+ /**
+ * Class to manage the future tasks for writeChunks.
+ */
+ static class CommitChunkFutureMap {
+ private final ConcurrentHashMap<Long, CompletableFuture<Message>>
+ block2ChunkMap = new ConcurrentHashMap<>();
+
+ synchronized int removeAndGetSize(long index) {
+ block2ChunkMap.remove(index);
+ return block2ChunkMap.size();
+ }
+
+ synchronized CompletableFuture<Message> add(long index,
+ CompletableFuture<Message> future) {
+ return block2ChunkMap.put(index, future);
+ }
+
+ synchronized List<CompletableFuture<Message>> getAll() {
+ return new ArrayList<>(block2ChunkMap.values());
+ }
+ }
+
+ /**
+ * This class maintains maps and provide utilities to enforce synchronization
+ * among createContainer, writeChunk, putKey and closeContainer.
+ */
+ private class StateMachineHelper {
+
+ private CompletableFuture<Message> createContainerFuture;
+
+ // Map for maintaining all writeChunk futures mapped to blockId
+ private final ConcurrentHashMap<Long, CommitChunkFutureMap>
+ block2ChunkMap;
+
+ // Map for putKey futures
+ private final ConcurrentHashMap<Long, CompletableFuture<Message>>
+ blockCommitMap;
+
+ StateMachineHelper() {
+ createContainerFuture = null;
+ block2ChunkMap = new ConcurrentHashMap<>();
+ blockCommitMap = new ConcurrentHashMap<>();
+ }
+
+ // The following section handles writeStateMachineData transactions
+ // on a container
+
+ // enqueue the create container future during writeStateMachineData
+ // so that the write stateMachine data phase of writeChunk wait on
+ // create container to finish.
+ private CompletableFuture<Message> handleCreateContainer() {
+ createContainerFuture = new CompletableFuture<>();
+ return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
+ }
+
+ // This synchronizes on create container to finish
+ private CompletableFuture<Message> handleWriteChunk(
+ ContainerCommandRequestProto requestProto, long entryIndex) {
+ CompletableFuture<Message> containerOpFuture;
+
+ if (createContainerFuture != null) {
+ containerOpFuture = createContainerFuture
+ .thenApplyAsync(v -> runCommand(requestProto), chunkExecutor);
+ } else {
+ containerOpFuture = CompletableFuture
+ .supplyAsync(() -> runCommand(requestProto), chunkExecutor);
+ }
+ writeChunkFutureMap.put(entryIndex, containerOpFuture);
+ return containerOpFuture;
+ }
+
+ CompletableFuture<Message> handleStateMachineData(
+ final ContainerCommandRequestProto requestProto, long index) {
+ Type cmdType = requestProto.getCmdType();
+ if (cmdType == Type.CreateContainer) {
+ return handleCreateContainer();
+ } else if (cmdType == Type.WriteChunk) {
+ return handleWriteChunk(requestProto, index);
+ } else {
+ return null;
+ }
+ }
+
+ // The following section handles applyTransaction transactions
+ // on a container
+
+ private CompletableFuture<Message> handlePutKey(
+ ContainerCommandRequestProto requestProto) {
+ List<CompletableFuture<Message>> futureList = new ArrayList<>();
+ long localId =
+ requestProto.getPutKey().getKeyData().getBlockID().getLocalID();
+ // Need not wait for create container future here as it has already
+ // finished.
+ if (block2ChunkMap.get(localId) != null) {
+ futureList.addAll(block2ChunkMap.get(localId).getAll());
+ }
+ CompletableFuture<Message> effectiveFuture =
+ runCommandAfterFutures(futureList, requestProto);
+
+ CompletableFuture<Message> putKeyFuture =
+ effectiveFuture.thenApply(message -> {
+ blockCommitMap.remove(localId);
+ return message;
+ });
+ blockCommitMap.put(localId, putKeyFuture);
+ return putKeyFuture;
+ }
+
+ // Close Container should be executed only if all pending WriteType
+ // container cmds get executed. Transactions which can return a future
+ // are WriteChunk and PutKey.
+ private CompletableFuture<Message> handleCloseContainer(
+ ContainerCommandRequestProto requestProto) {
+ List<CompletableFuture<Message>> futureList = new ArrayList<>();
+
+ // No need to wait for create container future here as it should have
+ // already finished.
+ block2ChunkMap.values().forEach(b -> futureList.addAll(b.getAll()));
+ futureList.addAll(blockCommitMap.values());
+
+ // There are pending write Chunk/PutKey type requests
+ // Queue this closeContainer request behind all these requests
+ CompletableFuture<Message> closeContainerFuture =
+ runCommandAfterFutures(futureList, requestProto);
+
+ return closeContainerFuture.thenApply(message -> {
+ stateMachineMap.remove(requestProto.getContainerID());
+ return message;
+ });
+ }
+
+ private CompletableFuture<Message> handleChunkCommit(
+ ContainerCommandRequestProto requestProto, long index) {
+ WriteChunkRequestProto write = requestProto.getWriteChunk();
+ // the data field has already been removed in start Transaction
+ Preconditions.checkArgument(!write.hasData());
+ CompletableFuture<Message> stateMachineFuture =
+ writeChunkFutureMap.remove(index);
+ CompletableFuture<Message> commitChunkFuture = stateMachineFuture
+ .thenComposeAsync(v -> CompletableFuture
+ .completedFuture(runCommand(requestProto)));
+
+ long localId = requestProto.getWriteChunk().getBlockID().getLocalID();
+ // Put the applyTransaction Future again to the Map.
+ // closeContainer should synchronize with this.
+ block2ChunkMap
+ .computeIfAbsent(localId, id -> new CommitChunkFutureMap())
+ .add(index, commitChunkFuture);
+ return commitChunkFuture.thenApply(message -> {
+ block2ChunkMap.computeIfPresent(localId, (containerId, chunks)
+ -> chunks.removeAndGetSize(index) == 0? null: chunks);
+ return message;
+ });
+ }
+
+ private CompletableFuture<Message> runCommandAfterFutures(
+ List<CompletableFuture<Message>> futureList,
+ ContainerCommandRequestProto requestProto) {
+ CompletableFuture<Message> effectiveFuture;
+ if (futureList.isEmpty()) {
+ effectiveFuture = CompletableFuture
+ .supplyAsync(() -> runCommand(requestProto));
+
+ } else {
+ CompletableFuture<Void> allFuture = CompletableFuture.allOf(
+ futureList.toArray(new CompletableFuture[futureList.size()]));
+ effectiveFuture = allFuture
+ .thenApplyAsync(v -> runCommand(requestProto));
+ }
+ return effectiveFuture;
+ }
+
+ CompletableFuture<Message> handleCreateContainer(
+ ContainerCommandRequestProto requestProto) {
+ CompletableFuture<Message> future =
+ CompletableFuture.completedFuture(runCommand(requestProto));
+ future.thenAccept(m -> {
+ createContainerFuture.complete(m);
+ createContainerFuture = null;
+ });
+ return future;
+ }
+
+ CompletableFuture<Message> handleOtherCommands(
+ ContainerCommandRequestProto requestProto) {
+ return CompletableFuture.completedFuture(runCommand(requestProto));
+ }
+
+ CompletableFuture<Message> executeContainerCommand(
+ ContainerCommandRequestProto requestProto, long index) {
+ Type cmdType = requestProto.getCmdType();
+ switch (cmdType) {
+ case WriteChunk:
+ return handleChunkCommit(requestProto, index);
+ case CloseContainer:
+ return handleCloseContainer(requestProto);
+ case PutKey:
+ return handlePutKey(requestProto);
+ case CreateContainer:
+ return handleCreateContainer(requestProto);
+ default:
+ return handleOtherCommands(requestProto);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public ConcurrentHashMap<Long, StateMachineHelper> getStateMachineMap() {
+ return stateMachineMap;
+ }
+
+ @VisibleForTesting
+ public CompletableFuture<Message> getCreateContainerFuture(long containerId) {
+ StateMachineHelper helper = stateMachineMap.get(containerId);
+ return helper == null ? null : helper.createContainerFuture;
+ }
+
+ @VisibleForTesting
+ public List<CompletableFuture<Message>> getCommitChunkFutureMap(
+ long containerId) {
+ StateMachineHelper helper = stateMachineMap.get(containerId);
+ if (helper != null) {
+ List<CompletableFuture<Message>> futureList = new ArrayList<>();
+ stateMachineMap.get(containerId).block2ChunkMap.values()
+ .forEach(b -> futureList.addAll(b.getAll()));
+ return futureList;
+ }
+ return null;
+ }
+
+ @VisibleForTesting
+ public Collection<CompletableFuture<Message>> getWriteChunkFutureMap() {
+ return writeChunkFutureMap.values();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ef29087/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java
new file mode 100644
index 0000000..448742e
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.server;
+
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerType;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis
+ .ContainerStateMachine;
+import org.apache.ratis.RatisHelper;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.shaded.proto.RaftProtos;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.util.ProtoUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Random;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class tests ContainerStateMachine.
+ */
+public class TestContainerStateMachine {
+
+ private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
+
+ private static long nextCallId() {
+ return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
+ }
+
+ private ThreadPoolExecutor executor =
+ new ThreadPoolExecutor(4, 4, 100, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(1024),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ private ContainerStateMachine stateMachine =
+ new ContainerStateMachine(new TestContainerDispatcher(), executor);
+
+
+ @Test
+ public void testCloseContainerSynchronization() throws Exception {
+ Pipeline pipeline = ContainerTestHelper.createPipeline(3);
+ long containerId = new Random().nextLong();
+
+ //create container request
+ RaftClientRequest createContainer = getRaftClientRequest(
+ ContainerTestHelper.getCreateContainerRequest(containerId, pipeline));
+
+ ContainerCommandRequestProto writeChunkProto = ContainerTestHelper
+ .getWriteChunkRequest(pipeline, new BlockID(containerId, nextCallId()),
+ 1024);
+
+ RaftClientRequest writeChunkRequest = getRaftClientRequest(writeChunkProto);
+
+ // add putKey request
+ ContainerCommandRequestProto putKeyProto = ContainerTestHelper
+ .getPutKeyRequest(pipeline, writeChunkProto.getWriteChunk());
+ RaftClientRequest putKeyRequest = getRaftClientRequest(putKeyProto);
+
+ TransactionContext createContainerCtxt =
+ startAndWriteStateMachineData(createContainer);
+ // Start and Write into the StateMachine
+ TransactionContext writeChunkcontext =
+ startAndWriteStateMachineData(writeChunkRequest);
+
+ TransactionContext putKeyContext =
+ stateMachine.startTransaction(putKeyRequest);
+ Assert.assertEquals(1, stateMachine.getStateMachineMap().size());
+ Assert.assertNotNull(stateMachine.getCreateContainerFuture(containerId));
+ Assert.assertEquals(1,
+ stateMachine.getWriteChunkFutureMap().size());
+ Assert.assertTrue(
+ stateMachine.getCommitChunkFutureMap(containerId).isEmpty());
+
+ //Add a closeContainerRequest
+ RaftClientRequest closeRequest = getRaftClientRequest(
+ ContainerTestHelper.getCloseContainer(pipeline, containerId));
+
+ TransactionContext closeCtx = stateMachine.startTransaction(closeRequest);
+
+ // Now apply all the transaction for the CreateContainer Command.
+ // This will unblock writeChunks as well
+
+ stateMachine.applyTransaction(createContainerCtxt);
+ stateMachine.applyTransaction(writeChunkcontext);
+ CompletableFuture<Message> putKeyFuture =
+ stateMachine.applyTransaction(putKeyContext);
+ waitForTransactionCompletion(putKeyFuture);
+ // Make sure the putKey transaction complete
+ Assert.assertTrue(putKeyFuture.isDone());
+
+ // Execute the closeContainer. This should ensure all prior Write Type
+ // container requests finish execution
+
+ CompletableFuture<Message> closeFuture =
+ stateMachine.applyTransaction(closeCtx);
+ waitForTransactionCompletion(closeFuture);
+ // Make sure the closeContainer transaction complete
+ Assert.assertTrue(closeFuture.isDone());
+ Assert.assertNull(stateMachine.getCreateContainerFuture(containerId));
+ Assert.assertNull(stateMachine.getCommitChunkFutureMap(containerId));
+
+ }
+
+ private RaftClientRequest getRaftClientRequest(
+ ContainerCommandRequestProto req) throws IOException {
+ ClientId clientId = ClientId.randomId();
+ return new RaftClientRequest(clientId,
+ RatisHelper.toRaftPeerId(ContainerTestHelper.createDatanodeDetails()),
+ RatisHelper.emptyRaftGroup().getGroupId(), nextCallId(), 0,
+ Message.valueOf(req.toByteString()), RaftClientRequest
+ .writeRequestType(RaftProtos.ReplicationLevel.MAJORITY));
+ }
+
+ private void waitForTransactionCompletion(
+ CompletableFuture<Message> future) throws Exception {
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ executorService
+ .invokeAll(Collections.singleton(future::get), 10,
+ TimeUnit.SECONDS); // Timeout of 10 minutes.
+ executorService.shutdown();
+ }
+
+ private TransactionContext startAndWriteStateMachineData(
+ RaftClientRequest request) throws IOException {
+ TransactionContext ctx = stateMachine.startTransaction(request);
+ RaftProtos.LogEntryProto e = ProtoUtils
+ .toLogEntryProto(ctx.getSMLogEntry(), request.getSeqNum(),
+ request.getCallId(), ClientId.randomId(), request.getCallId());
+ ctx.setLogEntry(e);
+ stateMachine.writeStateMachineData(e);
+ return ctx;
+ }
+
+ // ContainerDispatcher for test only purpose.
+ private static class TestContainerDispatcher implements ContainerDispatcher {
+ /**
+ * Dispatches commands to container layer.
+ *
+ * @param msg - Command Request
+ * @return Command Response
+ */
+ @Override
+ public ContainerCommandResponseProto dispatch(
+ ContainerCommandRequestProto msg) {
+ return ContainerTestHelper.getCreateContainerResponse(msg);
+ }
+
+ @Override
+ public void init() {
+ }
+
+ @Override
+ public void shutdown() {
+ }
+
+ @Override
+ public void setScmId(String scmId) {
+ }
+
+ @Override
+ public Handler getHandler(ContainerType containerType) {
+ return null;
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org