You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/03/05 16:40:09 UTC

[hadoop] branch trunk updated: HDDS-935. Avoid creating an already created container on a datanode in case of disk removal followed by datanode restart. Contributed by Shashikant Banerjee.

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5fcea54  HDDS-935. Avoid creating an already created container on a datanode in case of disk removal followed by datanode restart. Contributed by Shashikant Banerjee.
5fcea54 is described below

commit 5fcea54a50753a37b61f0e3c6be5a0236f95861a
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Tue Mar 5 22:09:25 2019 +0530

    HDDS-935. Avoid creating an already created container on a datanode in case of disk removal followed by datanode restart. Contributed by Shashikant Banerjee.
---
 .../src/main/proto/DatanodeContainerProtocol.proto |  5 ++
 .../ozone/container/common/impl/ContainerSet.java  | 22 ++++++-
 .../container/common/impl/HddsDispatcher.java      | 73 +++++++++++++++++++---
 .../common/interfaces/ContainerDispatcher.java     |  8 +++
 .../server/ratis/ContainerStateMachine.java        | 70 ++++++++++++++++++---
 .../transport/server/ratis/DispatcherContext.java  | 33 ++++++++--
 .../container/keyvalue/TestKeyValueHandler.java    |  3 +
 .../rpc/TestContainerStateMachineFailures.java     | 37 ++++++++++-
 .../transport/server/ratis/TestCSMMetrics.java     |  5 ++
 .../container/server/TestContainerServer.java      |  5 ++
 .../server/TestSecureContainerServer.java          |  4 ++
 11 files changed, 239 insertions(+), 26 deletions(-)

diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index 197bfad..3b78835 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -142,6 +142,7 @@ enum Result {
   UNKNOWN_BCSID = 37;
   BCSID_MISMATCH = 38;
   CONTAINER_NOT_OPEN = 39;
+  CONTAINER_MISSING = 40;
 }
 
 /**
@@ -245,6 +246,10 @@ message ContainerDataProto {
   optional ContainerType containerType = 10 [default = KeyValueContainer];
 }
 
+message ContainerIdSetProto {
+    repeated int64 containerId = 1;
+}
+
 enum ContainerType {
   KeyValueContainer = 1;
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
index aff2275..4a7a950 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
@@ -35,9 +35,11 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.Map;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.stream.Collectors;
 
 
@@ -50,7 +52,8 @@ public class ContainerSet {
 
   private final ConcurrentSkipListMap<Long, Container> containerMap = new
       ConcurrentSkipListMap<>();
-
+  private final ConcurrentSkipListSet<Long> missingContainerSet =
+      new ConcurrentSkipListSet<>();
   /**
    * Add Container to container map.
    * @param container
@@ -128,6 +131,7 @@ public class ContainerSet {
    * @return containerMap Iterator
    */
   public Iterator<Map.Entry<Long, Container>> getContainerMapIterator() {
+    containerMap.keySet().stream().collect(Collectors.toSet());
     return containerMap.entrySet().iterator();
   }
 
@@ -218,4 +222,20 @@ public class ContainerSet {
     return deletionPolicy
         .chooseContainerForBlockDeletion(count, containerDataMap);
   }
+
+  public Set<Long> getMissingContainerSet() {
+    return missingContainerSet;
+  }
+
+  /**
+   * Builds the missing container set by taking a diff total no containers
+   * actually found and number of containers which actually got created.
+   * This will only be called during the initialization of Datanode Service
+   * when  it still not a part of any write Pipeline.
+   * @param createdContainerSet ContainerId set persisted in the Ratis snapshot
+   */
+  public void buildMissingContainerSet(Set<Long> createdContainerSet) {
+    missingContainerSet.addAll(createdContainerSet);
+    missingContainerSet.removeAll(containerMap.keySet());
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 3653cb1..e7a6de3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -66,6 +66,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 /**
  * Ozone Container dispatcher takes a call from the netty server and routes it
@@ -101,7 +102,6 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
     this.containerCloseThreshold = conf.getFloat(
         HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD,
         HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
-
   }
 
   @Override
@@ -133,6 +133,12 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
   }
 
   @Override
+  public void buildMissingContainerSet(Set<Long> createdContainerSet) {
+    containerSet.buildMissingContainerSet(createdContainerSet);
+  }
+
+  @SuppressWarnings("methodlength")
+  @Override
   public ContainerCommandResponseProto dispatch(
       ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) {
     Preconditions.checkNotNull(msg);
@@ -145,18 +151,61 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
     Map<String, String> params =
         ContainerCommandRequestPBHelper.getAuditParams(msg);
 
-    Container container = null;
-    ContainerType containerType = null;
+    Container container;
+    ContainerType containerType;
     ContainerCommandResponseProto responseProto = null;
     long startTime = System.nanoTime();
     ContainerProtos.Type cmdType = msg.getCmdType();
     long containerID = msg.getContainerID();
     metrics.incContainerOpsMetrics(cmdType);
+    container = getContainer(containerID);
+    boolean isWriteStage =
+        (cmdType == ContainerProtos.Type.WriteChunk && dispatcherContext != null
+            && dispatcherContext.getStage()
+            == DispatcherContext.WriteChunkStage.WRITE_DATA);
+    boolean isWriteCommitStage =
+        (cmdType == ContainerProtos.Type.WriteChunk && dispatcherContext != null
+            && dispatcherContext.getStage()
+            == DispatcherContext.WriteChunkStage.COMMIT_DATA);
+
+    // if the command gets executed other than Ratis, the default wroite stage
+    // is WriteChunkStage.COMBINED
+    boolean isCombinedStage =
+        cmdType == ContainerProtos.Type.WriteChunk && (dispatcherContext == null
+            || dispatcherContext.getStage()
+            == DispatcherContext.WriteChunkStage.COMBINED);
+    Set<Long> containerIdSet = null;
+    if (dispatcherContext != null) {
+      containerIdSet = dispatcherContext.getCreateContainerSet();
+    }
+    if (isWriteCommitStage) {
+      //  check if the container Id exist in the loaded snapshot file. if
+      // it does not , it infers that , this is a restart of dn where
+      // the we are reapplying the transaction which was not captured in the
+      // snapshot.
+      // just add it to the list, and remove it from missing container set
+      // as it might have been added in the list during "init".
+      Preconditions.checkNotNull(containerIdSet);
+      if (!containerIdSet.contains(containerID)) {
+        containerIdSet.add(containerID);
+        containerSet.getMissingContainerSet().remove(containerID);
+      }
+    }
+    if (getMissingContainerSet().contains(containerID)) {
+      StorageContainerException sce = new StorageContainerException(
+          "ContainerID " + containerID
+              + " has been lost and and cannot be recreated on this DataNode",
+          ContainerProtos.Result.CONTAINER_MISSING);
+      audit(action, eventType, params, AuditEventStatus.FAILURE, sce);
+      return ContainerUtils.logAndReturnError(LOG, sce, msg);
+    }
 
     if (cmdType != ContainerProtos.Type.CreateContainer) {
-      container = getContainer(containerID);
-
-      if (container == null && (cmdType == ContainerProtos.Type.WriteChunk
+      /**
+       * Create Container should happen only as part of Write_Data phase of
+       * writeChunk.
+       */
+      if (container == null && ((isWriteStage || isCombinedStage)
           || cmdType == ContainerProtos.Type.PutSmallFile)) {
         // If container does not exist, create one for WriteChunk and
         // PutSmallFile request
@@ -168,7 +217,12 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
           audit(action, eventType, params, AuditEventStatus.FAILURE, sce);
           return ContainerUtils.logAndReturnError(LOG, sce, msg);
         }
-
+        Preconditions.checkArgument(isWriteStage && containerIdSet != null
+            || dispatcherContext == null);
+        if (containerIdSet != null) {
+          // adds this container to list of containers created in the pipeline
+          containerIdSet.add(containerID);
+        }
         container = getContainer(containerID);
       }
 
@@ -406,6 +460,11 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
     return containerSet.getContainer(containerID);
   }
 
+  @VisibleForTesting
+  public Set<Long> getMissingContainerSet() {
+    return containerSet.getMissingContainerSet();
+  }
+
   private ContainerType getContainerType(Container container) {
     return container.getContainerType();
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
index 46a0b55..e5a74cb 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 
+import java.util.Set;
+
 /**
  * Dispatcher acts as the bridge between the transport layer and
  * the actual container layer. This layer is capable of transforming
@@ -59,6 +61,12 @@ public interface ContainerDispatcher {
   void init();
 
   /**
+   * finds and builds the missing containers in case of a lost disk etc
+   * in the ContainerSet.
+   */
+  void buildMissingContainerSet(Set<Long> createdContainers);
+
+  /**
    * Shutdown Dispatcher services.
    */
   void shutdown();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 5587488..759f957 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 
@@ -36,6 +37,8 @@ import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
 import org.apache.ratis.thirdparty.com.google.protobuf
     .InvalidProtocolBufferException;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
+    ContainerIdSetProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -77,6 +80,12 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.io.FileOutputStream;
+import java.io.FileInputStream;
+import java.io.OutputStream;
+
 
 /** A {@link org.apache.ratis.statemachine.StateMachine} for containers.
  *
@@ -126,6 +135,9 @@ public class ContainerStateMachine extends BaseStateMachine {
   private final XceiverServerRatis ratisServer;
   private final ConcurrentHashMap<Long, CompletableFuture<Message>>
       writeChunkFutureMap;
+
+  // keeps track of the containers created per pipeline
+  private final Set<Long> createContainerSet;
   private ExecutorService[] executors;
   private final int numExecutors;
   private final Map<Long, Long> applyTransactionCompletionMap;
@@ -160,6 +172,7 @@ public class ContainerStateMachine extends BaseStateMachine {
         .maximumSize(chunkExecutor.getCorePoolSize()).build();
     this.isBlockTokenEnabled = isBlockTokenEnabled;
     this.tokenVerifier = tokenVerifier;
+    this.createContainerSet = new ConcurrentSkipListSet<>();
   }
 
   @Override
@@ -181,26 +194,56 @@ public class ContainerStateMachine extends BaseStateMachine {
     loadSnapshot(storage.getLatestSnapshot());
   }
 
-  private long loadSnapshot(SingleFileSnapshotInfo snapshot) {
+  private long loadSnapshot(SingleFileSnapshotInfo snapshot)
+      throws IOException {
     if (snapshot == null) {
-      TermIndex empty = TermIndex.newTermIndex(0,
-          RaftServerConstants.INVALID_LOG_INDEX);
-      LOG.info("The snapshot info is null." +
-          "Setting the last applied index to:" + empty);
+      TermIndex empty =
+          TermIndex.newTermIndex(0, RaftServerConstants.INVALID_LOG_INDEX);
+      LOG.info(
+          "The snapshot info is null." + "Setting the last applied index to:"
+              + empty);
       setLastAppliedTermIndex(empty);
       lastIndex = RaftServerConstants.INVALID_LOG_INDEX;
       return RaftServerConstants.INVALID_LOG_INDEX;
     }
 
+    final File snapshotFile = snapshot.getFile().getPath().toFile();
     final TermIndex last =
-        SimpleStateMachineStorage.getTermIndexFromSnapshotFile(
-            snapshot.getFile().getPath().toFile());
+        SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
     LOG.info("Setting the last applied index to " + last);
     setLastAppliedTermIndex(last);
     lastIndex = last.getIndex();
+
+    // initialize the dispatcher with snapshot so that it build the missing
+    // container list
+    try (FileInputStream fin = new FileInputStream(snapshotFile)) {
+      byte[] containerIds = IOUtils.toByteArray(fin);
+      ContainerProtos.ContainerIdSetProto proto =
+          ContainerProtos.ContainerIdSetProto.parseFrom(containerIds);
+      // read the created containers list from the snapshot file and add it to
+      // the createContainerSet here.
+      // createContainerSet will further grow as and when containers get created
+      createContainerSet.addAll(proto.getContainerIdList());
+      dispatcher.buildMissingContainerSet(createContainerSet);
+    }
     return last.getIndex();
   }
 
+  /**
+   * As a part of taking snapshot with Ratis StateMachine, it will persist
+   * the existing container set in the snapshotFile.
+   * @param out OutputStream mapped to the Ratis snapshot file
+   * @throws IOException
+   */
+  public void persistContainerSet(OutputStream out) throws IOException {
+    ContainerIdSetProto.Builder builder = ContainerIdSetProto.newBuilder();
+    builder.addAllContainerId(createContainerSet);
+    // TODO : while snapshot is being taken, deleteContainer call should not
+    // should not happen. Lock protection will be required if delete
+    // container happens outside of Ratis.
+    IOUtils.write(builder.build().toByteArray(), out);
+  }
+
   @Override
   public long takeSnapshot() throws IOException {
     TermIndex ti = getLastAppliedTermIndex();
@@ -211,8 +254,13 @@ public class ContainerStateMachine extends BaseStateMachine {
       LOG.info("Taking a snapshot to file {}", snapshotFile);
       try {
         //TODO: For now, just create the file to save the term index,
-        //persist open container info to snapshot later.
-        snapshotFile.createNewFile();
+        boolean created = snapshotFile.createNewFile();
+        if (!created) {
+          throw new IOException("Failed to create ratis snapshot file");
+        }
+        try (FileOutputStream fos = new FileOutputStream(snapshotFile)) {
+          persistContainerSet(fos);
+        }
       } catch(IOException ioe) {
         LOG.warn("Failed to write snapshot file \"" + snapshotFile
             + "\", last applied index=" + ti);
@@ -344,6 +392,7 @@ public class ContainerStateMachine extends BaseStateMachine {
             .setTerm(term)
             .setLogIndex(entryIndex)
             .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
+            .setCreateContainerSet(createContainerSet)
             .build();
     CompletableFuture<Message> writeChunkFuture;
     try {
@@ -586,6 +635,9 @@ public class ContainerStateMachine extends BaseStateMachine {
         builder
             .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA);
       }
+      if (cmdType == Type.WriteChunk || cmdType ==Type.PutSmallFile) {
+        builder.setCreateContainerSet(createContainerSet);
+      }
       try {
         Message msg = runCommand(requestProto, builder.build());
         future = CompletableFuture.supplyAsync(() -> msg,
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
index 28033aa..446f19f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import java.util.Set;
+
 /**
  * DispatcherContext class holds transport protocol specific context info
  * required for execution of container commands over the container dispatcher.
@@ -43,12 +45,15 @@ public final class DispatcherContext {
   // the log index in Ratis log to which the request belongs to
   private final long logIndex;
 
+  private final Set<Long> createContainerSet;
+
   private DispatcherContext(long term, long index, WriteChunkStage stage,
-      boolean readFromTmpFile) {
+      boolean readFromTmpFile, Set<Long> containerSet) {
     this.term = term;
     this.logIndex = index;
     this.stage = stage;
     this.readFromTmpFile = readFromTmpFile;
+    this.createContainerSet = containerSet;
   }
 
   public long getLogIndex() {
@@ -67,6 +72,10 @@ public final class DispatcherContext {
     return stage;
   }
 
+  public Set<Long> getCreateContainerSet() {
+    return createContainerSet;
+  }
+
   /**
    * Builder class for building DispatcherContext.
    */
@@ -75,11 +84,12 @@ public final class DispatcherContext {
     private boolean readFromTmpFile = false;
     private long term;
     private long logIndex;
+    private Set<Long> createContainerSet;
 
     /**
      * Sets the WriteChunkStage.
      *
-     * @param stage WriteChunk Stage
+     * @param writeChunkStage WriteChunk Stage
      * @return DispatcherContext.Builder
      */
     public Builder setStage(WriteChunkStage writeChunkStage) {
@@ -90,7 +100,7 @@ public final class DispatcherContext {
     /**
      * Sets the flag for reading from tmp chunk files.
      *
-     * @param readFromTmpFile whether to read from tmp chunk file or not
+     * @param setReadFromTmpFile whether to read from tmp chunk file or not
      * @return DispatcherContext.Builder
      */
     public Builder setReadFromTmpFile(boolean setReadFromTmpFile) {
@@ -101,7 +111,7 @@ public final class DispatcherContext {
     /**
      * Sets the current term for the container request from Ratis.
      *
-     * @param term current term
+     * @param currentTerm current term
      * @return DispatcherContext.Builder
      */
     public Builder setTerm(long currentTerm) {
@@ -112,7 +122,7 @@ public final class DispatcherContext {
     /**
      * Sets the logIndex for the container request from Ratis.
      *
-     * @param logIndex log index
+     * @param index log index
      * @return DispatcherContext.Builder
      */
     public Builder setLogIndex(long index) {
@@ -121,12 +131,23 @@ public final class DispatcherContext {
     }
 
     /**
+     * Sets the createContainerSet to contain all the containerIds per
+     * RaftGroup.
+     * @param set createContainerSet
+     * @return Builder
+     */
+    public Builder setCreateContainerSet(Set<Long> set) {
+      this.createContainerSet = set;
+      return this;
+    }
+    /**
      * Builds and returns DispatcherContext instance.
      *
      * @return DispatcherContext
      */
     public DispatcherContext build() {
-      return new DispatcherContext(term, logIndex, stage, readFromTmpFile);
+      return new DispatcherContext(term, logIndex, stage, readFromTmpFile,
+          createContainerSet);
     }
 
   }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index a2e7f50..8ef9e19 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -59,6 +59,7 @@ import static org.mockito.Mockito.times;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.UUID;
 
 /**
@@ -88,6 +89,8 @@ public class TestKeyValueHandler {
     Mockito.when(dispatcher.dispatch(any(), any())).thenCallRealMethod();
     Mockito.when(dispatcher.getContainer(anyLong())).thenReturn(
         Mockito.mock(KeyValueContainer.class));
+    Mockito.when(dispatcher.getMissingContainerSet())
+        .thenReturn(new HashSet<>());
     Mockito.when(handler.handle(any(), any(), any())).thenCallRealMethod();
     doCallRealMethod().when(dispatcher).setMetricsForTesting(any());
     dispatcher.setMetricsForTesting(Mockito.mock(ContainerMetrics.class));
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 3d826a5..68f1ecc 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -41,6 +43,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.
@@ -133,21 +136,49 @@ public class TestContainerStateMachineFailures {
         groupOutputStream.getLocationInfoList();
     Assert.assertEquals(1, locationInfoList.size());
     OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
-
     // delete the container dir
     FileUtil.fullyDelete(new File(
         cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
             .getContainer().getContainerSet()
             .getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
             .getContainerPath()));
-
     key.close();
+    long containerID = omKeyLocationInfo.getContainerID();
+
     // Make sure the container is marked unhealthy
     Assert.assertTrue(
         cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
             .getContainer().getContainerSet()
-            .getContainer(omKeyLocationInfo.getContainerID())
+            .getContainer(containerID)
             .getContainerState()
             == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
+    OzoneContainer ozoneContainer = cluster.getHddsDatanodes().get(0)
+        .getDatanodeStateMachine().getContainer();
+    // make sure the missing containerSet is empty
+    HddsDispatcher dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher();
+    Assert.assertTrue(dispatcher.getMissingContainerSet().isEmpty());
+
+    // restart the hdds datanode and see if the container is listed in the
+    // in the missing container set and not in the regular set
+    cluster.restartHddsDatanode(0, true);
+    ozoneContainer = cluster.getHddsDatanodes().get(0)
+        .getDatanodeStateMachine().getContainer();
+    dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher();
+
+    Assert
+        .assertNull(ozoneContainer.getContainerSet().getContainer(containerID));
+    Assert.assertTrue(dispatcher.getMissingContainerSet()
+        .contains(containerID));
+    ContainerProtos.ContainerCommandRequestProto.Builder request =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder();
+    request.setCmdType(ContainerProtos.Type.CreateContainer);
+    request.setContainerID(containerID);
+    request.setCreateContainer(
+        ContainerProtos.CreateContainerRequestProto.getDefaultInstance());
+    request.setTraceID(UUID.randomUUID().toString());
+    request.setDatanodeUuid(
+        cluster.getHddsDatanodes().get(0).getDatanodeDetails().getUuidString());
+    Assert.assertEquals(ContainerProtos.Result.CONTAINER_MISSING,
+        dispatcher.dispatch(request.build(), null).getResult());
   }
 }
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
index d0a991c..d2f2c91 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
@@ -51,6 +51,7 @@ import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.util.function.CheckedBiConsumer;
 
+import java.util.Set;
 import java.util.function.BiConsumer;
 
 import org.junit.Test;
@@ -196,5 +197,9 @@ public class TestCSMMetrics {
     public void setScmId(String scmId) {
 
     }
+
+    @Override
+    public void buildMissingContainerSet(Set<Long> createdContainerSet) {
+    }
   }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
index 8540939..63abd36 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
@@ -68,6 +68,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.Set;
 
 import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
 import static org.apache.ratis.rpc.SupportedRpcType.NETTY;
@@ -286,5 +287,9 @@ public class TestContainerServer {
     public void setScmId(String scmId) {
 
     }
+
+    @Override
+    public void buildMissingContainerSet(Set<Long> createdContainerSet) {
+    }
   }
 }
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
index 887c35a..f20feff 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
@@ -58,6 +58,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
@@ -232,7 +233,10 @@ public class TestSecureContainerServer {
 
     @Override
     public void setScmId(String scmId) {
+    }
 
+    @Override
+    public void buildMissingContainerSet(Set<Long> createdContainerSet) {
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org