You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/08/27 17:54:46 UTC
[hadoop] branch trunk updated: Revert "HDDS-1610. applyTransaction
failure should not be lost on restart. Contributed by Shashikant Banerjee."
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new ce8eb12 Revert "HDDS-1610. applyTransaction failure should not be lost on restart. Contributed by Shashikant Banerjee."
ce8eb12 is described below
commit ce8eb1283acbebb990a4f1e40848d78700309222
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Tue Aug 27 23:23:44 2019 +0530
Revert "HDDS-1610. applyTransaction failure should not be lost on restart. Contributed by Shashikant Banerjee."
This reverts commit 62445021d5d57b0d49adcb1bd4365c13532328fc as it has unintended changes in DirectoryWithSnapshotFeature class..
---
.../server/ratis/ContainerStateMachine.java | 84 ++++-------
.../transport/server/ratis/XceiverServerRatis.java | 9 --
.../proto/StorageContainerDatanodeProtocol.proto | 1 -
.../snapshot/DirectoryWithSnapshotFeature.java | 4 +-
.../rpc/TestContainerStateMachineFailures.java | 156 ++++++---------------
.../ozone/container/ContainerTestHelper.java | 16 ---
.../freon/TestFreonWithDatanodeFastRestart.java | 17 ++-
7 files changed, 80 insertions(+), 207 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index aadec8d..f4d4744 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.util.Time;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.protocol.TermIndex;
@@ -84,7 +83,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
@@ -149,7 +147,6 @@ public class ContainerStateMachine extends BaseStateMachine {
private final Cache<Long, ByteString> stateMachineDataCache;
private final boolean isBlockTokenEnabled;
private final TokenVerifier tokenVerifier;
- private final AtomicBoolean isStateMachineHealthy;
private final Semaphore applyTransactionSemaphore;
/**
@@ -187,7 +184,6 @@ public class ContainerStateMachine extends BaseStateMachine {
ScmConfigKeys.
DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
- isStateMachineHealthy = new AtomicBoolean(true);
this.executors = new ExecutorService[numContainerOpExecutors];
for (int i = 0; i < numContainerOpExecutors; i++) {
final int index = i;
@@ -269,14 +265,6 @@ public class ContainerStateMachine extends BaseStateMachine {
public long takeSnapshot() throws IOException {
TermIndex ti = getLastAppliedTermIndex();
long startTime = Time.monotonicNow();
- if (!isStateMachineHealthy.get()) {
- String msg =
- "Failed to take snapshot " + " for " + gid + " as the stateMachine"
- + " is unhealthy. The last applied index is at " + ti;
- StateMachineException sme = new StateMachineException(msg);
- LOG.error(msg);
- throw sme;
- }
if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
final File snapshotFile =
storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
@@ -287,12 +275,12 @@ public class ContainerStateMachine extends BaseStateMachine {
// make sure the snapshot file is synced
fos.getFD().sync();
} catch (IOException ioe) {
- LOG.error("{}: Failed to write snapshot at:{} file {}", gid, ti,
+ LOG.info("{}: Failed to write snapshot at:{} file {}", gid, ti,
snapshotFile);
throw ioe;
}
- LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}", gid, ti,
- snapshotFile, (Time.monotonicNow() - startTime));
+ LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}",
+ gid, ti, snapshotFile, (Time.monotonicNow() - startTime));
return ti.getIndex();
}
return -1;
@@ -397,12 +385,17 @@ public class ContainerStateMachine extends BaseStateMachine {
return response;
}
- private ContainerCommandResponseProto runCommand(
+ private ContainerCommandResponseProto runCommandGetResponse(
ContainerCommandRequestProto requestProto,
DispatcherContext context) {
return dispatchCommand(requestProto, context);
}
+ private Message runCommand(ContainerCommandRequestProto requestProto,
+ DispatcherContext context) {
+ return runCommandGetResponse(requestProto, context)::toByteString;
+ }
+
private ExecutorService getCommandExecutor(
ContainerCommandRequestProto requestProto) {
int executorId = (int)(requestProto.getContainerID() % executors.length);
@@ -432,7 +425,7 @@ public class ContainerStateMachine extends BaseStateMachine {
// thread.
CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
CompletableFuture.supplyAsync(() ->
- runCommand(requestProto, context), chunkExecutor);
+ runCommandGetResponse(requestProto, context), chunkExecutor);
CompletableFuture<Message> raftFuture = new CompletableFuture<>();
@@ -509,8 +502,7 @@ public class ContainerStateMachine extends BaseStateMachine {
metrics.incNumQueryStateMachineOps();
final ContainerCommandRequestProto requestProto =
getContainerCommandRequestProto(request.getContent());
- return CompletableFuture
- .completedFuture(runCommand(requestProto, null)::toByteString);
+ return CompletableFuture.completedFuture(runCommand(requestProto, null));
} catch (IOException e) {
metrics.incNumQueryStateMachineFails();
return completeExceptionally(e);
@@ -682,58 +674,30 @@ public class ContainerStateMachine extends BaseStateMachine {
if (cmdType == Type.WriteChunk || cmdType ==Type.PutSmallFile) {
builder.setCreateContainerSet(createContainerSet);
}
- CompletableFuture<Message> applyTransactionFuture =
- new CompletableFuture<>();
// Ensure the command gets executed in a separate thread than
// stateMachineUpdater thread which is calling applyTransaction here.
- CompletableFuture<ContainerCommandResponseProto> future =
- CompletableFuture.supplyAsync(
- () -> runCommand(requestProto, builder.build()),
+ CompletableFuture<Message> future = CompletableFuture
+ .supplyAsync(() -> runCommand(requestProto, builder.build()),
getCommandExecutor(requestProto));
- future.thenApply(r -> {
+
+ future.thenAccept(m -> {
if (trx.getServerRole() == RaftPeerRole.LEADER) {
long startTime = (long) trx.getStateMachineContext();
metrics.incPipelineLatency(cmdType,
Time.monotonicNowNanos() - startTime);
}
- if (r.getResult() != ContainerProtos.Result.SUCCESS) {
- StorageContainerException sce =
- new StorageContainerException(r.getMessage(), r.getResult());
- LOG.error(
- "gid {} : ApplyTransaction failed. cmd {} logIndex {} msg : "
- + "{} Container Result: {}", gid, r.getCmdType(), index,
- r.getMessage(), r.getResult());
- metrics.incNumApplyTransactionsFails();
- // Since the applyTransaction now is completed exceptionally,
- // before any further snapshot is taken , the exception will be
- // caught in stateMachineUpdater in Ratis and ratis server will
- // shutdown.
- applyTransactionFuture.completeExceptionally(sce);
- isStateMachineHealthy.compareAndSet(true, false);
- ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole());
- } else {
- LOG.debug(
- "gid {} : ApplyTransaction completed. cmd {} logIndex {} msg : "
- + "{} Container Result: {}", gid, r.getCmdType(), index,
- r.getMessage(), r.getResult());
- applyTransactionFuture.complete(r::toByteString);
- if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) {
- metrics.incNumBytesCommittedCount(
- requestProto.getWriteChunk().getChunkData().getLen());
- }
- // add the entry to the applyTransactionCompletionMap only if the
- // stateMachine is healthy i.e, there has been no applyTransaction
- // failures before.
- if (isStateMachineHealthy.get()) {
- final Long previous = applyTransactionCompletionMap
+
+ final Long previous =
+ applyTransactionCompletionMap
.put(index, trx.getLogEntry().getTerm());
- Preconditions.checkState(previous == null);
- updateLastApplied();
- }
+ Preconditions.checkState(previous == null);
+ if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) {
+ metrics.incNumBytesCommittedCount(
+ requestProto.getWriteChunk().getChunkData().getLen());
}
- return applyTransactionFuture;
+ updateLastApplied();
}).whenComplete((r, t) -> applyTransactionSemaphore.release());
- return applyTransactionFuture;
+ return future;
} catch (IOException | InterruptedException e) {
metrics.incNumApplyTransactionsFails();
return completeExceptionally(e);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index b4021cf..f0ed28b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -609,15 +609,6 @@ public final class XceiverServerRatis extends XceiverServer {
handlePipelineFailure(groupId, roleInfoProto);
}
- void handleApplyTransactionFailure(RaftGroupId groupId,
- RaftProtos.RaftPeerRole role) {
- UUID dnId = RatisHelper.toDatanodeId(getServer().getId());
- String msg =
- "Ratis Transaction failure in datanode " + dnId + " with role " + role
- + " .Triggering pipeline close action.";
- triggerPipelineClose(groupId, msg,
- ClosePipelineInfo.Reason.STATEMACHINE_TRANSACTION_FAILED, true);
- }
/**
* The fact that the snapshot contents cannot be used to actually catch up
* the follower, it is the reason to initiate close pipeline and
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 1d09dfa..500735a 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -214,7 +214,6 @@ message ClosePipelineInfo {
enum Reason {
PIPELINE_FAILED = 1;
PIPELINE_LOG_FAILED = 2;
- STATEMACHINE_TRANSACTION_FAILED = 3;
}
required PipelineID pipelineID = 1;
optional Reason reason = 3;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
index ffbc174..7fb639c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
@@ -742,8 +742,8 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
if (currentINode.isLastReference()) {
// if this is the last reference, the created list can be
// destroyed.
- // priorDiff.getChildrenDiff().destroyCreatedList(
- // reclaimContext, currentINode);
+ priorDiff.getChildrenDiff().destroyCreatedList(
+ reclaimContext, currentINode);
} else {
// we only check the node originally in prior's created list
for (INode cNode : priorDiff.diff.getCreatedUnmodifiable()) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 86621d6..469eeb0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -22,32 +22,23 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.ratis.protocol.RaftRetryFailureException;
-import org.apache.ratis.protocol.StateMachineException;
-import org.apache.ratis.server.storage.FileInfo;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -55,7 +46,6 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
-import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -64,8 +54,7 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.
HDDS_COMMAND_STATUS_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.
HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
- ContainerDataProto.State.UNHEALTHY;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
@@ -88,7 +77,7 @@ public class TestContainerStateMachineFailures {
private static String volumeName;
private static String bucketName;
private static String path;
- private static XceiverClientManager xceiverClientManager;
+ private static int chunkSize;
/**
* Create a MiniDFSCluster for testing.
@@ -112,11 +101,6 @@ public class TestContainerStateMachineFailures {
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10,
TimeUnit.SECONDS);
- conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
- conf.setTimeDuration(
- OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
- 1, TimeUnit.SECONDS);
- conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
conf.setQuietMode(false);
cluster =
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200)
@@ -125,7 +109,6 @@ public class TestContainerStateMachineFailures {
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
objectStore = client.getObjectStore();
- xceiverClientManager = new XceiverClientManager(conf);
volumeName = "testcontainerstatemachinefailures";
bucketName = volumeName;
objectStore.createVolume(volumeName);
@@ -149,10 +132,19 @@ public class TestContainerStateMachineFailures {
.createKey("ratis", 1024, ReplicationType.RATIS,
ReplicationFactor.ONE, new HashMap<>());
byte[] testData = "ratis".getBytes();
+ long written = 0;
// First write and flush creates a container in the datanode
key.write(testData);
+ written += testData.length;
key.flush();
key.write(testData);
+ written += testData.length;
+
+ //get the name of a valid container
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
+ setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+ .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis")
+ .build();
KeyOutputStream groupOutputStream =
(KeyOutputStream) key.getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
@@ -165,14 +157,7 @@ public class TestContainerStateMachineFailures {
.getContainer().getContainerSet()
.getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
.getContainerPath()));
- try {
- // there is only 1 datanode in the pipeline, the pipeline will be closed
- // and allocation to new pipeline will fail as there is no other dn in
- // the cluster
- key.close();
- } catch(IOException ioe) {
- Assert.assertTrue(ioe instanceof OMException);
- }
+ key.close();
long containerID = omKeyLocationInfo.getContainerID();
// Make sure the container is marked unhealthy
@@ -194,6 +179,22 @@ public class TestContainerStateMachineFailures {
.getDatanodeStateMachine().getContainer();
Assert
.assertNull(ozoneContainer.getContainerSet().getContainer(containerID));
+
+ OzoneKeyDetails keyDetails = objectStore.getVolume(volumeName)
+ .getBucket(bucketName).getKey("ratis");
+
+ /**
+ * Ensure length of data stored in key is equal to number of bytes written.
+ */
+ Assert.assertTrue("Number of bytes stored in the key is not equal " +
+ "to number of bytes written.", keyDetails.getDataSize() == written);
+
+ /**
+ * Pending data from the second write should get written to a new container
+ * during key.close() because the first container is UNHEALTHY by that time
+ */
+ Assert.assertTrue("Expect Key to be stored in 2 separate containers",
+ keyDetails.getOzoneKeyLocations().size() == 2);
}
@Test
@@ -206,6 +207,12 @@ public class TestContainerStateMachineFailures {
key.write("ratis".getBytes());
key.flush();
key.write("ratis".getBytes());
+
+ //get the name of a valid container
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
+ setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+ .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis")
+ .build();
KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
groupOutputStream.getLocationInfoList();
@@ -221,14 +228,8 @@ public class TestContainerStateMachineFailures {
(KeyValueContainerData) containerData;
// delete the container db file
FileUtil.fullyDelete(new File(keyValueContainerData.getChunksPath()));
- try {
- // there is only 1 datanode in the pipeline, the pipeline will be closed
- // and allocation to new pipeline will fail as there is no other dn in
- // the cluster
- key.close();
- } catch(IOException ioe) {
- Assert.assertTrue(ioe instanceof OMException);
- }
+
+ key.close();
long containerID = omKeyLocationInfo.getContainerID();
@@ -269,83 +270,4 @@ public class TestContainerStateMachineFailures {
Assert.assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY,
dispatcher.dispatch(request.build(), null).getResult());
}
-
- @Test
- public void testApplyTransactionFailure() throws Exception {
- OzoneOutputStream key =
- objectStore.getVolume(volumeName).getBucket(bucketName)
- .createKey("ratis", 1024, ReplicationType.RATIS,
- ReplicationFactor.ONE, new HashMap<>());
- // First write and flush creates a container in the datanode
- key.write("ratis".getBytes());
- key.flush();
- key.write("ratis".getBytes());
- KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
- List<OmKeyLocationInfo> locationInfoList =
- groupOutputStream.getLocationInfoList();
- Assert.assertEquals(1, locationInfoList.size());
- OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
- ContainerData containerData =
- cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
- .getContainer().getContainerSet()
- .getContainer(omKeyLocationInfo.getContainerID())
- .getContainerData();
- Assert.assertTrue(containerData instanceof KeyValueContainerData);
- KeyValueContainerData keyValueContainerData =
- (KeyValueContainerData) containerData;
- key.close();
- ContainerStateMachine stateMachine =
- (ContainerStateMachine) ContainerTestHelper.getStateMachine(cluster);
- SimpleStateMachineStorage storage =
- (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
- Path parentPath = storage.findLatestSnapshot().getFile().getPath();
- // Since the snapshot threshold is set to 1, since there are
- // applyTransactions, we should see snapshots
- Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0);
- FileInfo snapshot = storage.findLatestSnapshot().getFile();
- Assert.assertNotNull(snapshot);
- long containerID = omKeyLocationInfo.getContainerID();
- // delete the container db file
- FileUtil.fullyDelete(new File(keyValueContainerData.getContainerPath()));
- Pipeline pipeline = cluster.getStorageContainerLocationClient()
- .getContainerWithPipeline(containerID).getPipeline();
- XceiverClientSpi xceiverClient =
- xceiverClientManager.acquireClient(pipeline);
- ContainerProtos.ContainerCommandRequestProto.Builder request =
- ContainerProtos.ContainerCommandRequestProto.newBuilder();
- request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
- request.setCmdType(ContainerProtos.Type.CloseContainer);
- request.setContainerID(containerID);
- request.setCloseContainer(
- ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
- // close container transaction will fail over Ratis and will initiate
- // a pipeline close action
-
- // Since the applyTransaction failure is propagated to Ratis,
- // stateMachineUpdater will it exception while taking the next snapshot
- // and should shutdown the RaftServerImpl. The client request will fail
- // with RaftRetryFailureException.
- try {
- xceiverClient.sendCommand(request.build());
- Assert.fail("Expected exception not thrown");
- } catch (IOException e) {
- Assert.assertTrue(HddsClientUtils
- .checkForException(e) instanceof RaftRetryFailureException);
- }
- // Make sure the container is marked unhealthy
- Assert.assertTrue(
- cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
- .getContainer().getContainerSet().getContainer(containerID)
- .getContainerState()
- == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
- try {
- // try to take a new snapshot, ideally it should just fail
- stateMachine.takeSnapshot();
- } catch (IOException ioe) {
- Assert.assertTrue(ioe instanceof StateMachineException);
- }
- // Make sure the latest snapshot is same as the previous one
- FileInfo latestSnapshot = storage.findLatestSnapshot().getFile();
- Assert.assertTrue(snapshot.getPath().equals(latestSnapshot.getPath()));
- }
}
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index 82d34d7..4da1907 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -73,10 +73,6 @@ import org.apache.hadoop.security.token.Token;
import com.google.common.base.Preconditions;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
-import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.Assert;
import org.slf4j.Logger;
@@ -870,16 +866,4 @@ public final class ContainerTestHelper {
index++;
}
}
-
- public static StateMachine getStateMachine(MiniOzoneCluster cluster)
- throws Exception {
- XceiverServerSpi server =
- cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().
- getContainer().getWriteChannel();
- RaftServerProxy proxy =
- (RaftServerProxy) (((XceiverServerRatis) server).getServer());
- RaftGroupId groupId = proxy.getGroupIds().iterator().next();
- RaftServerImpl impl = proxy.getImpl(groupId);
- return impl.getStateMachine();
- }
}
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java
index 545f2b3..8ed3960 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java
@@ -22,7 +22,13 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.transport
+ .server.XceiverServerSpi;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis
+ .XceiverServerRatis;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
@@ -121,6 +127,13 @@ public class TestFreonWithDatanodeFastRestart {
}
private StateMachine getStateMachine() throws Exception {
- return ContainerTestHelper.getStateMachine(cluster);
+ XceiverServerSpi server =
+ cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().
+ getContainer().getWriteChannel();
+ RaftServerProxy proxy =
+ (RaftServerProxy)(((XceiverServerRatis)server).getServer());
+ RaftGroupId groupId = proxy.getGroupIds().iterator().next();
+ RaftServerImpl impl = proxy.getImpl(groupId);
+ return impl.getStateMachine();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org