You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ms...@apache.org on 2018/08/16 18:06:34 UTC

hadoop git commit: HDDS-179. CloseContainer/PutKey command should be syncronized with write operations. Contributed by Shashikant Banerjee.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 0e832e7a7 -> 5ef29087a


HDDS-179. CloseContainer/PutKey command should be syncronized with write operations. Contributed by Shashikant Banerjee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5ef29087
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5ef29087
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5ef29087

Branch: refs/heads/trunk
Commit: 5ef29087ad27f4f6b815dbc08ea7427d14df58e1
Parents: 0e832e7
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Thu Aug 16 23:35:19 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Thu Aug 16 23:35:19 2018 +0530

----------------------------------------------------------------------
 .../server/ratis/ContainerStateMachine.java     | 323 +++++++++++++++----
 .../server/TestContainerStateMachine.java       | 201 ++++++++++++
 2 files changed, 467 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ef29087/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 15e991a..52ea3aa 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.container.common.transport.server.ratis;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.ratis.protocol.RaftGroupId;
@@ -52,6 +53,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -95,6 +99,13 @@ import java.util.concurrent.ThreadPoolExecutor;
  * {@link #applyTransaction} need to be enforced in the StateMachine
  * implementation. For example, synchronization between writeChunk and
  * createContainer in {@link ContainerStateMachine}.
+ *
+ * PutKey is synchronized with WriteChunk operations, PutKey for a block is
+ * executed only after all the WriteChunk preceding the PutKey have finished.
+ *
+ * CloseContainer is synchronized with WriteChunk and PutKey operations,
+ * CloseContainer for a container is processed after all the preceding write
+ * operations for the container have finished.
  * */
 public class ContainerStateMachine extends BaseStateMachine {
   static final Logger LOG = LoggerFactory.getLogger(
@@ -105,15 +116,14 @@ public class ContainerStateMachine extends BaseStateMachine {
   private ThreadPoolExecutor chunkExecutor;
   private final ConcurrentHashMap<Long, CompletableFuture<Message>>
       writeChunkFutureMap;
-  private final ConcurrentHashMap<Long, CompletableFuture<Message>>
-      createContainerFutureMap;
+  private final ConcurrentHashMap<Long, StateMachineHelper> stateMachineMap;
 
-  ContainerStateMachine(ContainerDispatcher dispatcher,
+  public ContainerStateMachine(ContainerDispatcher dispatcher,
       ThreadPoolExecutor chunkExecutor) {
     this.dispatcher = dispatcher;
     this.chunkExecutor = chunkExecutor;
     this.writeChunkFutureMap = new ConcurrentHashMap<>();
-    this.createContainerFutureMap = new ConcurrentHashMap<>();
+    this.stateMachineMap = new ConcurrentHashMap<>();
   }
 
   @Override
@@ -203,32 +213,6 @@ public class ContainerStateMachine extends BaseStateMachine {
     return dispatchCommand(requestProto)::toByteString;
   }
 
-  private CompletableFuture<Message> handleWriteChunk(
-      ContainerCommandRequestProto requestProto, long entryIndex) {
-    final WriteChunkRequestProto write = requestProto.getWriteChunk();
-    long containerID = write.getBlockID().getContainerID();
-    CompletableFuture<Message> future =
-        createContainerFutureMap.get(containerID);
-    CompletableFuture<Message> writeChunkFuture;
-    if (future != null) {
-      writeChunkFuture = future.thenApplyAsync(
-          v -> runCommand(requestProto), chunkExecutor);
-    } else {
-      writeChunkFuture = CompletableFuture.supplyAsync(
-          () -> runCommand(requestProto), chunkExecutor);
-    }
-    writeChunkFutureMap.put(entryIndex, writeChunkFuture);
-    return writeChunkFuture;
-  }
-
-  private CompletableFuture<Message> handleCreateContainer(
-      ContainerCommandRequestProto requestProto) {
-    long containerID = requestProto.getContainerID();
-    createContainerFutureMap.
-        computeIfAbsent(containerID, k -> new CompletableFuture<>());
-    return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
-  }
-
   /*
    * writeStateMachineData calls are not synchronized with each other
    * and also with applyTransaction.
@@ -239,15 +223,17 @@ public class ContainerStateMachine extends BaseStateMachine {
       final ContainerCommandRequestProto requestProto =
           getRequestProto(entry.getSmLogEntry().getStateMachineData());
       Type cmdType = requestProto.getCmdType();
-      switch (cmdType) {
-      case CreateContainer:
-        return handleCreateContainer(requestProto);
-      case WriteChunk:
-        return handleWriteChunk(requestProto, entry.getIndex());
-      default:
-        throw new IllegalStateException("Cmd Type:" + cmdType
-            + " should not have state machine data");
+      long containerId = requestProto.getContainerID();
+      stateMachineMap
+          .computeIfAbsent(containerId, k -> new StateMachineHelper());
+      CompletableFuture<Message> stateMachineFuture =
+          stateMachineMap.get(containerId)
+              .handleStateMachineData(requestProto, entry.getIndex());
+      if (stateMachineFuture == null) {
+        throw new IllegalStateException(
+            "Cmd Type:" + cmdType + " should not have state machine data");
       }
+      return stateMachineFuture;
     } catch (IOException e) {
       return completeExceptionally(e);
     }
@@ -363,25 +349,13 @@ public class ContainerStateMachine extends BaseStateMachine {
     try {
       ContainerCommandRequestProto requestProto =
           getRequestProto(trx.getSMLogEntry().getData());
-      Type cmdType = requestProto.getCmdType();
-
-      if (cmdType == Type.WriteChunk) {
-        WriteChunkRequestProto write = requestProto.getWriteChunk();
-        // the data field has already been removed in start Transaction
-        Preconditions.checkArgument(!write.hasData());
-        CompletableFuture<Message> stateMachineFuture =
-            writeChunkFutureMap.remove(trx.getLogEntry().getIndex());
-        return stateMachineFuture
-            .thenComposeAsync(v ->
-                CompletableFuture.completedFuture(runCommand(requestProto)));
-      } else {
-        Message message = runCommand(requestProto);
-        if (cmdType == Type.CreateContainer) {
-          long containerID = requestProto.getContainerID();
-          createContainerFutureMap.remove(containerID).complete(message);
-        }
-        return CompletableFuture.completedFuture(message);
-      }
+      Preconditions.checkState(!HddsUtils.isReadOnly(requestProto));
+      stateMachineMap.computeIfAbsent(requestProto.getContainerID(),
+          k -> new StateMachineHelper());
+      long index =
+          trx.getLogEntry() == null ? -1 : trx.getLogEntry().getIndex();
+      return stateMachineMap.get(requestProto.getContainerID())
+          .executeContainerCommand(requestProto, index);
     } catch (IOException e) {
       return completeExceptionally(e);
     }
@@ -396,4 +370,239 @@ public class ContainerStateMachine extends BaseStateMachine {
   @Override
   public void close() throws IOException {
   }
+
+  /**
+   * Class to manage the future tasks for writeChunks.
+   */
+  static class CommitChunkFutureMap {
+    private final ConcurrentHashMap<Long, CompletableFuture<Message>>
+        block2ChunkMap = new ConcurrentHashMap<>();
+
+    synchronized int removeAndGetSize(long index) {
+      block2ChunkMap.remove(index);
+      return block2ChunkMap.size();
+    }
+
+    synchronized CompletableFuture<Message> add(long index,
+        CompletableFuture<Message> future) {
+      return block2ChunkMap.put(index, future);
+    }
+
+    synchronized List<CompletableFuture<Message>> getAll() {
+      return new ArrayList<>(block2ChunkMap.values());
+    }
+  }
+
+  /**
+   * This class maintains maps and provide utilities to enforce synchronization
+   * among createContainer, writeChunk, putKey and closeContainer.
+   */
+  private class StateMachineHelper {
+
+    private CompletableFuture<Message> createContainerFuture;
+
+    // Map for maintaining all writeChunk futures mapped to blockId
+    private final ConcurrentHashMap<Long, CommitChunkFutureMap>
+        block2ChunkMap;
+
+    // Map for putKey futures
+    private final ConcurrentHashMap<Long, CompletableFuture<Message>>
+        blockCommitMap;
+
+    StateMachineHelper() {
+      createContainerFuture = null;
+      block2ChunkMap = new ConcurrentHashMap<>();
+      blockCommitMap = new ConcurrentHashMap<>();
+    }
+
+    // The following section handles writeStateMachineData transactions
+    // on a container
+
+    // enqueue the create container future during writeStateMachineData
+    // so that the write stateMachine data phase of writeChunk wait on
+    // create container to finish.
+    private CompletableFuture<Message> handleCreateContainer() {
+      createContainerFuture = new CompletableFuture<>();
+      return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
+    }
+
+    // This synchronizes on create container to finish
+    private CompletableFuture<Message> handleWriteChunk(
+        ContainerCommandRequestProto requestProto, long entryIndex) {
+      CompletableFuture<Message> containerOpFuture;
+
+      if (createContainerFuture != null) {
+        containerOpFuture = createContainerFuture
+            .thenApplyAsync(v -> runCommand(requestProto), chunkExecutor);
+      } else {
+        containerOpFuture = CompletableFuture
+            .supplyAsync(() -> runCommand(requestProto), chunkExecutor);
+      }
+      writeChunkFutureMap.put(entryIndex, containerOpFuture);
+      return containerOpFuture;
+    }
+
+    CompletableFuture<Message> handleStateMachineData(
+        final ContainerCommandRequestProto requestProto, long index) {
+      Type cmdType = requestProto.getCmdType();
+      if (cmdType == Type.CreateContainer) {
+        return handleCreateContainer();
+      } else if (cmdType == Type.WriteChunk) {
+        return handleWriteChunk(requestProto, index);
+      } else {
+        return null;
+      }
+    }
+
+    // The following section handles applyTransaction transactions
+    // on a container
+
+    private CompletableFuture<Message> handlePutKey(
+        ContainerCommandRequestProto requestProto) {
+      List<CompletableFuture<Message>> futureList = new ArrayList<>();
+      long localId =
+          requestProto.getPutKey().getKeyData().getBlockID().getLocalID();
+      // Need not wait for create container future here as it has already
+      // finished.
+      if (block2ChunkMap.get(localId) != null) {
+        futureList.addAll(block2ChunkMap.get(localId).getAll());
+      }
+      CompletableFuture<Message> effectiveFuture =
+          runCommandAfterFutures(futureList, requestProto);
+
+      CompletableFuture<Message> putKeyFuture =
+          effectiveFuture.thenApply(message -> {
+            blockCommitMap.remove(localId);
+            return message;
+          });
+      blockCommitMap.put(localId, putKeyFuture);
+      return putKeyFuture;
+    }
+
+    // Close Container should be executed only if all pending WriteType
+    // container cmds get executed. Transactions which can return a future
+    // are WriteChunk and PutKey.
+    private CompletableFuture<Message> handleCloseContainer(
+        ContainerCommandRequestProto requestProto) {
+      List<CompletableFuture<Message>> futureList = new ArrayList<>();
+
+      // No need to wait for create container future here as it should have
+      // already finished.
+      block2ChunkMap.values().forEach(b -> futureList.addAll(b.getAll()));
+      futureList.addAll(blockCommitMap.values());
+
+      // There are pending write Chunk/PutKey type requests
+      // Queue this closeContainer request behind all these requests
+      CompletableFuture<Message> closeContainerFuture =
+          runCommandAfterFutures(futureList, requestProto);
+
+      return closeContainerFuture.thenApply(message -> {
+        stateMachineMap.remove(requestProto.getContainerID());
+        return message;
+      });
+    }
+
+    private CompletableFuture<Message> handleChunkCommit(
+        ContainerCommandRequestProto requestProto, long index) {
+      WriteChunkRequestProto write = requestProto.getWriteChunk();
+      // the data field has already been removed in start Transaction
+      Preconditions.checkArgument(!write.hasData());
+      CompletableFuture<Message> stateMachineFuture =
+          writeChunkFutureMap.remove(index);
+      CompletableFuture<Message> commitChunkFuture = stateMachineFuture
+          .thenComposeAsync(v -> CompletableFuture
+              .completedFuture(runCommand(requestProto)));
+
+      long localId = requestProto.getWriteChunk().getBlockID().getLocalID();
+      // Put the applyTransaction Future again to the Map.
+      // closeContainer should synchronize with this.
+      block2ChunkMap
+          .computeIfAbsent(localId, id -> new CommitChunkFutureMap())
+          .add(index, commitChunkFuture);
+      return commitChunkFuture.thenApply(message -> {
+        block2ChunkMap.computeIfPresent(localId, (containerId, chunks)
+            -> chunks.removeAndGetSize(index) == 0? null: chunks);
+        return message;
+      });
+    }
+
+    private CompletableFuture<Message> runCommandAfterFutures(
+        List<CompletableFuture<Message>> futureList,
+        ContainerCommandRequestProto requestProto) {
+      CompletableFuture<Message> effectiveFuture;
+      if (futureList.isEmpty()) {
+        effectiveFuture = CompletableFuture
+            .supplyAsync(() -> runCommand(requestProto));
+
+      } else {
+        CompletableFuture<Void> allFuture = CompletableFuture.allOf(
+            futureList.toArray(new CompletableFuture[futureList.size()]));
+        effectiveFuture = allFuture
+            .thenApplyAsync(v -> runCommand(requestProto));
+      }
+      return effectiveFuture;
+    }
+
+    CompletableFuture<Message> handleCreateContainer(
+        ContainerCommandRequestProto requestProto) {
+      CompletableFuture<Message> future =
+          CompletableFuture.completedFuture(runCommand(requestProto));
+      future.thenAccept(m -> {
+        createContainerFuture.complete(m);
+        createContainerFuture = null;
+      });
+      return future;
+    }
+
+    CompletableFuture<Message> handleOtherCommands(
+        ContainerCommandRequestProto requestProto) {
+      return CompletableFuture.completedFuture(runCommand(requestProto));
+    }
+
+    CompletableFuture<Message> executeContainerCommand(
+        ContainerCommandRequestProto requestProto, long index) {
+      Type cmdType = requestProto.getCmdType();
+      switch (cmdType) {
+      case WriteChunk:
+        return handleChunkCommit(requestProto, index);
+      case CloseContainer:
+        return handleCloseContainer(requestProto);
+      case PutKey:
+        return handlePutKey(requestProto);
+      case CreateContainer:
+        return handleCreateContainer(requestProto);
+      default:
+        return handleOtherCommands(requestProto);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public ConcurrentHashMap<Long, StateMachineHelper> getStateMachineMap() {
+    return stateMachineMap;
+  }
+
+  @VisibleForTesting
+  public CompletableFuture<Message> getCreateContainerFuture(long containerId) {
+    StateMachineHelper helper = stateMachineMap.get(containerId);
+    return helper == null ? null : helper.createContainerFuture;
+  }
+
+  @VisibleForTesting
+  public List<CompletableFuture<Message>> getCommitChunkFutureMap(
+      long containerId) {
+    StateMachineHelper helper = stateMachineMap.get(containerId);
+    if (helper != null) {
+      List<CompletableFuture<Message>> futureList = new ArrayList<>();
+      stateMachineMap.get(containerId).block2ChunkMap.values()
+          .forEach(b -> futureList.addAll(b.getAll()));
+      return futureList;
+    }
+    return null;
+  }
+
+  @VisibleForTesting
+  public Collection<CompletableFuture<Message>> getWriteChunkFutureMap() {
+    return writeChunkFutureMap.values();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ef29087/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java
new file mode 100644
index 0000000..448742e
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.server;
+
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerType;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis
+    .ContainerStateMachine;
+import org.apache.ratis.RatisHelper;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.shaded.proto.RaftProtos;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.util.ProtoUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Random;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class tests ContainerStateMachine.
+ */
+public class TestContainerStateMachine {
+
+  private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
+
+  private static long nextCallId() {
+    return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
+  }
+
+  private ThreadPoolExecutor executor =
+      new ThreadPoolExecutor(4, 4, 100, TimeUnit.SECONDS,
+          new ArrayBlockingQueue<>(1024),
+          new ThreadPoolExecutor.CallerRunsPolicy());
+  private ContainerStateMachine stateMachine =
+      new ContainerStateMachine(new TestContainerDispatcher(), executor);
+
+
+  @Test
+  public void testCloseContainerSynchronization() throws Exception {
+    Pipeline pipeline = ContainerTestHelper.createPipeline(3);
+    long containerId = new Random().nextLong();
+
+    //create container request
+    RaftClientRequest createContainer = getRaftClientRequest(
+        ContainerTestHelper.getCreateContainerRequest(containerId, pipeline));
+
+    ContainerCommandRequestProto writeChunkProto = ContainerTestHelper
+        .getWriteChunkRequest(pipeline, new BlockID(containerId, nextCallId()),
+            1024);
+
+    RaftClientRequest writeChunkRequest = getRaftClientRequest(writeChunkProto);
+
+    // add putKey request
+    ContainerCommandRequestProto putKeyProto = ContainerTestHelper
+            .getPutKeyRequest(pipeline, writeChunkProto.getWriteChunk());
+    RaftClientRequest putKeyRequest = getRaftClientRequest(putKeyProto);
+
+    TransactionContext createContainerCtxt =
+        startAndWriteStateMachineData(createContainer);
+    // Start and Write into the StateMachine
+    TransactionContext writeChunkcontext =
+        startAndWriteStateMachineData(writeChunkRequest);
+
+    TransactionContext putKeyContext =
+        stateMachine.startTransaction(putKeyRequest);
+    Assert.assertEquals(1, stateMachine.getStateMachineMap().size());
+    Assert.assertNotNull(stateMachine.getCreateContainerFuture(containerId));
+    Assert.assertEquals(1,
+        stateMachine.getWriteChunkFutureMap().size());
+    Assert.assertTrue(
+        stateMachine.getCommitChunkFutureMap(containerId).isEmpty());
+
+    //Add a closeContainerRequest
+    RaftClientRequest closeRequest = getRaftClientRequest(
+        ContainerTestHelper.getCloseContainer(pipeline, containerId));
+
+    TransactionContext closeCtx = stateMachine.startTransaction(closeRequest);
+
+    // Now apply all the transaction for the CreateContainer Command.
+    // This will unblock writeChunks as well
+
+    stateMachine.applyTransaction(createContainerCtxt);
+    stateMachine.applyTransaction(writeChunkcontext);
+    CompletableFuture<Message> putKeyFuture =
+        stateMachine.applyTransaction(putKeyContext);
+    waitForTransactionCompletion(putKeyFuture);
+    // Make sure the putKey transaction complete
+    Assert.assertTrue(putKeyFuture.isDone());
+
+    // Execute the closeContainer. This should ensure all prior Write Type
+    // container requests finish execution
+
+    CompletableFuture<Message> closeFuture =
+        stateMachine.applyTransaction(closeCtx);
+    waitForTransactionCompletion(closeFuture);
+    // Make sure the closeContainer transaction complete
+    Assert.assertTrue(closeFuture.isDone());
+    Assert.assertNull(stateMachine.getCreateContainerFuture(containerId));
+    Assert.assertNull(stateMachine.getCommitChunkFutureMap(containerId));
+
+  }
+
+  private RaftClientRequest getRaftClientRequest(
+      ContainerCommandRequestProto req) throws IOException {
+    ClientId clientId = ClientId.randomId();
+    return new RaftClientRequest(clientId,
+        RatisHelper.toRaftPeerId(ContainerTestHelper.createDatanodeDetails()),
+        RatisHelper.emptyRaftGroup().getGroupId(), nextCallId(), 0,
+        Message.valueOf(req.toByteString()), RaftClientRequest
+        .writeRequestType(RaftProtos.ReplicationLevel.MAJORITY));
+  }
+
+  private void waitForTransactionCompletion(
+      CompletableFuture<Message> future) throws Exception {
+    ExecutorService executorService = Executors.newSingleThreadExecutor();
+    executorService
+        .invokeAll(Collections.singleton(future::get), 10,
+            TimeUnit.SECONDS); // Timeout of 10 minutes.
+    executorService.shutdown();
+  }
+
+  private TransactionContext startAndWriteStateMachineData(
+      RaftClientRequest request) throws IOException {
+    TransactionContext ctx = stateMachine.startTransaction(request);
+    RaftProtos.LogEntryProto e = ProtoUtils
+        .toLogEntryProto(ctx.getSMLogEntry(), request.getSeqNum(),
+            request.getCallId(), ClientId.randomId(), request.getCallId());
+    ctx.setLogEntry(e);
+    stateMachine.writeStateMachineData(e);
+    return ctx;
+  }
+
+  // ContainerDispatcher for test only purpose.
+  private static class TestContainerDispatcher implements ContainerDispatcher {
+    /**
+     * Dispatches commands to container layer.
+     *
+     * @param msg - Command Request
+     * @return Command Response
+     */
+    @Override
+    public ContainerCommandResponseProto dispatch(
+        ContainerCommandRequestProto msg) {
+      return ContainerTestHelper.getCreateContainerResponse(msg);
+    }
+
+    @Override
+    public void init() {
+    }
+
+    @Override
+    public void shutdown() {
+    }
+
+    @Override
+    public void setScmId(String scmId) {
+    }
+
+    @Override
+    public Handler getHandler(ContainerType containerType) {
+      return null;
+    }
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org