You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/03/05 16:40:09 UTC
[hadoop] branch trunk updated: HDDS-935. Avoid creating an already
created container on a datanode in case of disk removal followed by
datanode restart. Contributed by Shashikant Banerjee.
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5fcea54 HDDS-935. Avoid creating an already created container on a datanode in case of disk removal followed by datanode restart. Contributed by Shashikant Banerjee.
5fcea54 is described below
commit 5fcea54a50753a37b61f0e3c6be5a0236f95861a
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Tue Mar 5 22:09:25 2019 +0530
HDDS-935. Avoid creating an already created container on a datanode in case of disk removal followed by datanode restart. Contributed by Shashikant Banerjee.
---
.../src/main/proto/DatanodeContainerProtocol.proto | 5 ++
.../ozone/container/common/impl/ContainerSet.java | 22 ++++++-
.../container/common/impl/HddsDispatcher.java | 73 +++++++++++++++++++---
.../common/interfaces/ContainerDispatcher.java | 8 +++
.../server/ratis/ContainerStateMachine.java | 70 ++++++++++++++++++---
.../transport/server/ratis/DispatcherContext.java | 33 ++++++++--
.../container/keyvalue/TestKeyValueHandler.java | 3 +
.../rpc/TestContainerStateMachineFailures.java | 37 ++++++++++-
.../transport/server/ratis/TestCSMMetrics.java | 5 ++
.../container/server/TestContainerServer.java | 5 ++
.../server/TestSecureContainerServer.java | 4 ++
11 files changed, 239 insertions(+), 26 deletions(-)
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index 197bfad..3b78835 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -142,6 +142,7 @@ enum Result {
UNKNOWN_BCSID = 37;
BCSID_MISMATCH = 38;
CONTAINER_NOT_OPEN = 39;
+ CONTAINER_MISSING = 40;
}
/**
@@ -245,6 +246,10 @@ message ContainerDataProto {
optional ContainerType containerType = 10 [default = KeyValueContainer];
}
+message ContainerIdSetProto {
+ repeated int64 containerId = 1;
+}
+
enum ContainerType {
KeyValueContainer = 1;
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
index aff2275..4a7a950 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
@@ -35,9 +35,11 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Collectors;
@@ -50,7 +52,8 @@ public class ContainerSet {
private final ConcurrentSkipListMap<Long, Container> containerMap = new
ConcurrentSkipListMap<>();
-
+ private final ConcurrentSkipListSet<Long> missingContainerSet =
+ new ConcurrentSkipListSet<>();
/**
* Add Container to container map.
* @param container
@@ -128,6 +131,7 @@ public class ContainerSet {
* @return containerMap Iterator
*/
public Iterator<Map.Entry<Long, Container>> getContainerMapIterator() {
+ containerMap.keySet().stream().collect(Collectors.toSet());
return containerMap.entrySet().iterator();
}
@@ -218,4 +222,20 @@ public class ContainerSet {
return deletionPolicy
.chooseContainerForBlockDeletion(count, containerDataMap);
}
+
+ public Set<Long> getMissingContainerSet() {
+ return missingContainerSet;
+ }
+
+ /**
+ * Builds the missing container set by taking a diff total no containers
+ * actually found and number of containers which actually got created.
+ * This will only be called during the initialization of Datanode Service
+ * when it still not a part of any write Pipeline.
+ * @param createdContainerSet ContainerId set persisted in the Ratis snapshot
+ */
+ public void buildMissingContainerSet(Set<Long> createdContainerSet) {
+ missingContainerSet.addAll(createdContainerSet);
+ missingContainerSet.removeAll(containerMap.keySet());
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 3653cb1..e7a6de3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -66,6 +66,7 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
/**
* Ozone Container dispatcher takes a call from the netty server and routes it
@@ -101,7 +102,6 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
this.containerCloseThreshold = conf.getFloat(
HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD,
HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
-
}
@Override
@@ -133,6 +133,12 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
}
@Override
+ public void buildMissingContainerSet(Set<Long> createdContainerSet) {
+ containerSet.buildMissingContainerSet(createdContainerSet);
+ }
+
+ @SuppressWarnings("methodlength")
+ @Override
public ContainerCommandResponseProto dispatch(
ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) {
Preconditions.checkNotNull(msg);
@@ -145,18 +151,61 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
Map<String, String> params =
ContainerCommandRequestPBHelper.getAuditParams(msg);
- Container container = null;
- ContainerType containerType = null;
+ Container container;
+ ContainerType containerType;
ContainerCommandResponseProto responseProto = null;
long startTime = System.nanoTime();
ContainerProtos.Type cmdType = msg.getCmdType();
long containerID = msg.getContainerID();
metrics.incContainerOpsMetrics(cmdType);
+ container = getContainer(containerID);
+ boolean isWriteStage =
+ (cmdType == ContainerProtos.Type.WriteChunk && dispatcherContext != null
+ && dispatcherContext.getStage()
+ == DispatcherContext.WriteChunkStage.WRITE_DATA);
+ boolean isWriteCommitStage =
+ (cmdType == ContainerProtos.Type.WriteChunk && dispatcherContext != null
+ && dispatcherContext.getStage()
+ == DispatcherContext.WriteChunkStage.COMMIT_DATA);
+
+ // if the command gets executed other than Ratis, the default wroite stage
+ // is WriteChunkStage.COMBINED
+ boolean isCombinedStage =
+ cmdType == ContainerProtos.Type.WriteChunk && (dispatcherContext == null
+ || dispatcherContext.getStage()
+ == DispatcherContext.WriteChunkStage.COMBINED);
+ Set<Long> containerIdSet = null;
+ if (dispatcherContext != null) {
+ containerIdSet = dispatcherContext.getCreateContainerSet();
+ }
+ if (isWriteCommitStage) {
+ // check if the container Id exist in the loaded snapshot file. if
+ // it does not , it infers that , this is a restart of dn where
+ // the we are reapplying the transaction which was not captured in the
+ // snapshot.
+ // just add it to the list, and remove it from missing container set
+ // as it might have been added in the list during "init".
+ Preconditions.checkNotNull(containerIdSet);
+ if (!containerIdSet.contains(containerID)) {
+ containerIdSet.add(containerID);
+ containerSet.getMissingContainerSet().remove(containerID);
+ }
+ }
+ if (getMissingContainerSet().contains(containerID)) {
+ StorageContainerException sce = new StorageContainerException(
+ "ContainerID " + containerID
+ + " has been lost and and cannot be recreated on this DataNode",
+ ContainerProtos.Result.CONTAINER_MISSING);
+ audit(action, eventType, params, AuditEventStatus.FAILURE, sce);
+ return ContainerUtils.logAndReturnError(LOG, sce, msg);
+ }
if (cmdType != ContainerProtos.Type.CreateContainer) {
- container = getContainer(containerID);
-
- if (container == null && (cmdType == ContainerProtos.Type.WriteChunk
+ /**
+ * Create Container should happen only as part of Write_Data phase of
+ * writeChunk.
+ */
+ if (container == null && ((isWriteStage || isCombinedStage)
|| cmdType == ContainerProtos.Type.PutSmallFile)) {
// If container does not exist, create one for WriteChunk and
// PutSmallFile request
@@ -168,7 +217,12 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
audit(action, eventType, params, AuditEventStatus.FAILURE, sce);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
-
+ Preconditions.checkArgument(isWriteStage && containerIdSet != null
+ || dispatcherContext == null);
+ if (containerIdSet != null) {
+ // adds this container to list of containers created in the pipeline
+ containerIdSet.add(containerID);
+ }
container = getContainer(containerID);
}
@@ -406,6 +460,11 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
return containerSet.getContainer(containerID);
}
+ @VisibleForTesting
+ public Set<Long> getMissingContainerSet() {
+ return containerSet.getMissingContainerSet();
+ }
+
private ContainerType getContainerType(Container container) {
return container.getContainerType();
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
index 46a0b55..e5a74cb 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
+import java.util.Set;
+
/**
* Dispatcher acts as the bridge between the transport layer and
* the actual container layer. This layer is capable of transforming
@@ -59,6 +61,12 @@ public interface ContainerDispatcher {
void init();
/**
+ * finds and builds the missing containers in case of a lost disk etc
+ * in the ContainerSet.
+ */
+ void buildMissingContainerSet(Set<Long> createdContainers);
+
+ /**
* Shutdown Dispatcher services.
*/
void shutdown();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 5587488..759f957 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -36,6 +37,8 @@ import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.thirdparty.com.google.protobuf
.InvalidProtocolBufferException;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
+ ContainerIdSetProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -77,6 +80,12 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.io.FileOutputStream;
+import java.io.FileInputStream;
+import java.io.OutputStream;
+
/** A {@link org.apache.ratis.statemachine.StateMachine} for containers.
*
@@ -126,6 +135,9 @@ public class ContainerStateMachine extends BaseStateMachine {
private final XceiverServerRatis ratisServer;
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
writeChunkFutureMap;
+
+ // keeps track of the containers created per pipeline
+ private final Set<Long> createContainerSet;
private ExecutorService[] executors;
private final int numExecutors;
private final Map<Long, Long> applyTransactionCompletionMap;
@@ -160,6 +172,7 @@ public class ContainerStateMachine extends BaseStateMachine {
.maximumSize(chunkExecutor.getCorePoolSize()).build();
this.isBlockTokenEnabled = isBlockTokenEnabled;
this.tokenVerifier = tokenVerifier;
+ this.createContainerSet = new ConcurrentSkipListSet<>();
}
@Override
@@ -181,26 +194,56 @@ public class ContainerStateMachine extends BaseStateMachine {
loadSnapshot(storage.getLatestSnapshot());
}
- private long loadSnapshot(SingleFileSnapshotInfo snapshot) {
+ private long loadSnapshot(SingleFileSnapshotInfo snapshot)
+ throws IOException {
if (snapshot == null) {
- TermIndex empty = TermIndex.newTermIndex(0,
- RaftServerConstants.INVALID_LOG_INDEX);
- LOG.info("The snapshot info is null." +
- "Setting the last applied index to:" + empty);
+ TermIndex empty =
+ TermIndex.newTermIndex(0, RaftServerConstants.INVALID_LOG_INDEX);
+ LOG.info(
+ "The snapshot info is null." + "Setting the last applied index to:"
+ + empty);
setLastAppliedTermIndex(empty);
lastIndex = RaftServerConstants.INVALID_LOG_INDEX;
return RaftServerConstants.INVALID_LOG_INDEX;
}
+ final File snapshotFile = snapshot.getFile().getPath().toFile();
final TermIndex last =
- SimpleStateMachineStorage.getTermIndexFromSnapshotFile(
- snapshot.getFile().getPath().toFile());
+ SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
LOG.info("Setting the last applied index to " + last);
setLastAppliedTermIndex(last);
lastIndex = last.getIndex();
+
+ // initialize the dispatcher with snapshot so that it build the missing
+ // container list
+ try (FileInputStream fin = new FileInputStream(snapshotFile)) {
+ byte[] containerIds = IOUtils.toByteArray(fin);
+ ContainerProtos.ContainerIdSetProto proto =
+ ContainerProtos.ContainerIdSetProto.parseFrom(containerIds);
+ // read the created containers list from the snapshot file and add it to
+ // the createContainerSet here.
+ // createContainerSet will further grow as and when containers get created
+ createContainerSet.addAll(proto.getContainerIdList());
+ dispatcher.buildMissingContainerSet(createContainerSet);
+ }
return last.getIndex();
}
+ /**
+ * As a part of taking snapshot with Ratis StateMachine, it will persist
+ * the existing container set in the snapshotFile.
+ * @param out OutputStream mapped to the Ratis snapshot file
+ * @throws IOException
+ */
+ public void persistContainerSet(OutputStream out) throws IOException {
+ ContainerIdSetProto.Builder builder = ContainerIdSetProto.newBuilder();
+ builder.addAllContainerId(createContainerSet);
+ // TODO : while snapshot is being taken, deleteContainer call should not
+ // should not happen. Lock protection will be required if delete
+ // container happens outside of Ratis.
+ IOUtils.write(builder.build().toByteArray(), out);
+ }
+
@Override
public long takeSnapshot() throws IOException {
TermIndex ti = getLastAppliedTermIndex();
@@ -211,8 +254,13 @@ public class ContainerStateMachine extends BaseStateMachine {
LOG.info("Taking a snapshot to file {}", snapshotFile);
try {
//TODO: For now, just create the file to save the term index,
- //persist open container info to snapshot later.
- snapshotFile.createNewFile();
+ boolean created = snapshotFile.createNewFile();
+ if (!created) {
+ throw new IOException("Failed to create ratis snapshot file");
+ }
+ try (FileOutputStream fos = new FileOutputStream(snapshotFile)) {
+ persistContainerSet(fos);
+ }
} catch(IOException ioe) {
LOG.warn("Failed to write snapshot file \"" + snapshotFile
+ "\", last applied index=" + ti);
@@ -344,6 +392,7 @@ public class ContainerStateMachine extends BaseStateMachine {
.setTerm(term)
.setLogIndex(entryIndex)
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
+ .setCreateContainerSet(createContainerSet)
.build();
CompletableFuture<Message> writeChunkFuture;
try {
@@ -586,6 +635,9 @@ public class ContainerStateMachine extends BaseStateMachine {
builder
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA);
}
+ if (cmdType == Type.WriteChunk || cmdType ==Type.PutSmallFile) {
+ builder.setCreateContainerSet(createContainerSet);
+ }
try {
Message msg = runCommand(requestProto, builder.build());
future = CompletableFuture.supplyAsync(() -> msg,
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
index 28033aa..446f19f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import java.util.Set;
+
/**
* DispatcherContext class holds transport protocol specific context info
* required for execution of container commands over the container dispatcher.
@@ -43,12 +45,15 @@ public final class DispatcherContext {
// the log index in Ratis log to which the request belongs to
private final long logIndex;
+ private final Set<Long> createContainerSet;
+
private DispatcherContext(long term, long index, WriteChunkStage stage,
- boolean readFromTmpFile) {
+ boolean readFromTmpFile, Set<Long> containerSet) {
this.term = term;
this.logIndex = index;
this.stage = stage;
this.readFromTmpFile = readFromTmpFile;
+ this.createContainerSet = containerSet;
}
public long getLogIndex() {
@@ -67,6 +72,10 @@ public final class DispatcherContext {
return stage;
}
+ public Set<Long> getCreateContainerSet() {
+ return createContainerSet;
+ }
+
/**
* Builder class for building DispatcherContext.
*/
@@ -75,11 +84,12 @@ public final class DispatcherContext {
private boolean readFromTmpFile = false;
private long term;
private long logIndex;
+ private Set<Long> createContainerSet;
/**
* Sets the WriteChunkStage.
*
- * @param stage WriteChunk Stage
+ * @param writeChunkStage WriteChunk Stage
* @return DispatcherContext.Builder
*/
public Builder setStage(WriteChunkStage writeChunkStage) {
@@ -90,7 +100,7 @@ public final class DispatcherContext {
/**
* Sets the flag for reading from tmp chunk files.
*
- * @param readFromTmpFile whether to read from tmp chunk file or not
+ * @param setReadFromTmpFile whether to read from tmp chunk file or not
* @return DispatcherContext.Builder
*/
public Builder setReadFromTmpFile(boolean setReadFromTmpFile) {
@@ -101,7 +111,7 @@ public final class DispatcherContext {
/**
* Sets the current term for the container request from Ratis.
*
- * @param term current term
+ * @param currentTerm current term
* @return DispatcherContext.Builder
*/
public Builder setTerm(long currentTerm) {
@@ -112,7 +122,7 @@ public final class DispatcherContext {
/**
* Sets the logIndex for the container request from Ratis.
*
- * @param logIndex log index
+ * @param index log index
* @return DispatcherContext.Builder
*/
public Builder setLogIndex(long index) {
@@ -121,12 +131,23 @@ public final class DispatcherContext {
}
/**
+ * Sets the createContainerSet to contain all the containerIds per
+ * RaftGroup.
+ * @param set createContainerSet
+ * @return Builder
+ */
+ public Builder setCreateContainerSet(Set<Long> set) {
+ this.createContainerSet = set;
+ return this;
+ }
+ /**
* Builds and returns DispatcherContext instance.
*
* @return DispatcherContext
*/
public DispatcherContext build() {
- return new DispatcherContext(term, logIndex, stage, readFromTmpFile);
+ return new DispatcherContext(term, logIndex, stage, readFromTmpFile,
+ createContainerSet);
}
}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index a2e7f50..8ef9e19 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -59,6 +59,7 @@ import static org.mockito.Mockito.times;
import java.io.File;
import java.io.IOException;
+import java.util.HashSet;
import java.util.UUID;
/**
@@ -88,6 +89,8 @@ public class TestKeyValueHandler {
Mockito.when(dispatcher.dispatch(any(), any())).thenCallRealMethod();
Mockito.when(dispatcher.getContainer(anyLong())).thenReturn(
Mockito.mock(KeyValueContainer.class));
+ Mockito.when(dispatcher.getMissingContainerSet())
+ .thenReturn(new HashSet<>());
Mockito.when(handler.handle(any(), any(), any())).thenCallRealMethod();
doCallRealMethod().when(dispatcher).setMetricsForTesting(any());
dispatcher.setMetricsForTesting(Mockito.mock(ContainerMetrics.class));
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 3d826a5..68f1ecc 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.test.GenericTestUtils;
@@ -41,6 +43,7 @@ import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdds.HddsConfigKeys.
@@ -133,21 +136,49 @@ public class TestContainerStateMachineFailures {
groupOutputStream.getLocationInfoList();
Assert.assertEquals(1, locationInfoList.size());
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
-
// delete the container dir
FileUtil.fullyDelete(new File(
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet()
.getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
.getContainerPath()));
-
key.close();
+ long containerID = omKeyLocationInfo.getContainerID();
+
// Make sure the container is marked unhealthy
Assert.assertTrue(
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet()
- .getContainer(omKeyLocationInfo.getContainerID())
+ .getContainer(containerID)
.getContainerState()
== ContainerProtos.ContainerDataProto.State.UNHEALTHY);
+ OzoneContainer ozoneContainer = cluster.getHddsDatanodes().get(0)
+ .getDatanodeStateMachine().getContainer();
+ // make sure the missing containerSet is empty
+ HddsDispatcher dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher();
+ Assert.assertTrue(dispatcher.getMissingContainerSet().isEmpty());
+
+ // restart the hdds datanode and see if the container is listed in the
+ // in the missing container set and not in the regular set
+ cluster.restartHddsDatanode(0, true);
+ ozoneContainer = cluster.getHddsDatanodes().get(0)
+ .getDatanodeStateMachine().getContainer();
+ dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher();
+
+ Assert
+ .assertNull(ozoneContainer.getContainerSet().getContainer(containerID));
+ Assert.assertTrue(dispatcher.getMissingContainerSet()
+ .contains(containerID));
+ ContainerProtos.ContainerCommandRequestProto.Builder request =
+ ContainerProtos.ContainerCommandRequestProto.newBuilder();
+ request.setCmdType(ContainerProtos.Type.CreateContainer);
+ request.setContainerID(containerID);
+ request.setCreateContainer(
+ ContainerProtos.CreateContainerRequestProto.getDefaultInstance());
+ request.setTraceID(UUID.randomUUID().toString());
+ request.setDatanodeUuid(
+ cluster.getHddsDatanodes().get(0).getDatanodeDetails().getUuidString());
+ Assert.assertEquals(ContainerProtos.Result.CONTAINER_MISSING,
+ dispatcher.dispatch(request.build(), null).getResult());
}
}
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
index d0a991c..d2f2c91 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
@@ -51,6 +51,7 @@ import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.util.function.CheckedBiConsumer;
+import java.util.Set;
import java.util.function.BiConsumer;
import org.junit.Test;
@@ -196,5 +197,9 @@ public class TestCSMMetrics {
public void setScmId(String scmId) {
}
+
+ @Override
+ public void buildMissingContainerSet(Set<Long> createdContainerSet) {
+ }
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
index 8540939..63abd36 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
@@ -68,6 +68,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.Set;
import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
import static org.apache.ratis.rpc.SupportedRpcType.NETTY;
@@ -286,5 +287,9 @@ public class TestContainerServer {
public void setScmId(String scmId) {
}
+
+ @Override
+ public void buildMissingContainerSet(Set<Long> createdContainerSet) {
+ }
}
}
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
index 887c35a..f20feff 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
@@ -58,6 +58,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
@@ -232,7 +233,10 @@ public class TestSecureContainerServer {
@Override
public void setScmId(String scmId) {
+ }
+ @Override
+ public void buildMissingContainerSet(Set<Long> createdContainerSet) {
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org