You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by br...@apache.org on 2018/10/30 06:03:16 UTC
[46/50] [abbrv] hadoop git commit: HDDS-728. Datanodes should use
different ContainerStateMachine for each pipeline. Contributed by Mukul Kumar
Singh.
HDDS-728. Datanodes should use different ContainerStateMachine for each pipeline.
Contributed by Mukul Kumar Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/119cfe1d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/119cfe1d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/119cfe1d
Branch: refs/heads/HDFS-13891
Commit: 119cfe1d18d6f9b04670b7793cefe2c796c1943a
Parents: 7ba7898
Author: Nanda kumar <na...@apache.org>
Authored: Mon Oct 29 19:53:52 2018 +0530
Committer: Brahma Reddy Battula <br...@apache.org>
Committed: Tue Oct 30 11:31:17 2018 +0530
----------------------------------------------------------------------
.../statemachine/DatanodeStateMachine.java | 3 +
.../transport/server/ratis/CSMMetrics.java | 5 +-
.../server/ratis/ContainerStateMachine.java | 21 ++-
.../server/ratis/XceiverServerRatis.java | 26 ++--
hadoop-hdds/pom.xml | 2 +-
.../hdds/scm/container/SCMContainerManager.java | 8 +-
.../hdds/scm/pipeline/TestNodeFailure.java | 2 +-
.../apache/hadoop/ozone/MiniOzoneCluster.java | 8 +-
.../hadoop/ozone/MiniOzoneClusterImpl.java | 20 +--
.../hadoop/ozone/client/rpc/TestBCSID.java | 2 +-
.../commandhandler/TestBlockDeletion.java | 4 +-
.../hadoop/ozone/web/client/TestKeys.java | 2 +-
hadoop-ozone/pom.xml | 2 +-
.../freon/TestFreonWithDatanodeFastRestart.java | 130 +++++++++++++++++++
.../freon/TestFreonWithDatanodeRestart.java | 53 +++-----
15 files changed, 208 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 85fa304..4768cf8 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
@@ -120,6 +122,7 @@ public class DatanodeStateMachine implements Closeable {
.addPublisherFor(NodeReportProto.class)
.addPublisherFor(ContainerReportsProto.class)
.addPublisherFor(CommandStatusReportsProto.class)
+ .addPublisherFor(PipelineReportsProto.class)
.build();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
index b6aed60..9ccf88a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.ratis.protocol.RaftGroupId;
/**
* This class is for maintaining Container State Machine statistics.
@@ -47,9 +48,9 @@ public class CSMMetrics {
public CSMMetrics() {
}
- public static CSMMetrics create() {
+ public static CSMMetrics create(RaftGroupId gid) {
MetricsSystem ms = DefaultMetricsSystem.instance();
- return ms.register(SOURCE_NAME,
+ return ms.register(SOURCE_NAME + gid.toString(),
"Container State Machine",
new CSMMetrics());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index bcbf93f..ac0833b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -66,7 +66,6 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
@@ -112,6 +111,7 @@ public class ContainerStateMachine extends BaseStateMachine {
LoggerFactory.getLogger(ContainerStateMachine.class);
private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
+ private final RaftGroupId gid;
private final ContainerDispatcher dispatcher;
private ThreadPoolExecutor chunkExecutor;
private final XceiverServerRatis ratisServer;
@@ -127,21 +127,19 @@ public class ContainerStateMachine extends BaseStateMachine {
*/
private final CSMMetrics metrics;
- public ContainerStateMachine(ContainerDispatcher dispatcher,
+ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer,
- int numOfExecutors) {
+ List<ExecutorService> executors) {
+ this.gid = gid;
this.dispatcher = dispatcher;
this.chunkExecutor = chunkExecutor;
this.ratisServer = ratisServer;
+ metrics = CSMMetrics.create(gid);
+ this.numExecutors = executors.size();
+ this.executors = executors.toArray(new ExecutorService[numExecutors]);
this.writeChunkFutureMap = new ConcurrentHashMap<>();
- metrics = CSMMetrics.create();
this.createContainerFutureMap = new ConcurrentHashMap<>();
- this.numExecutors = numOfExecutors;
- executors = new ExecutorService[numExecutors];
containerCommandCompletionMap = new ConcurrentHashMap<>();
- for (int i = 0; i < numExecutors; i++) {
- executors[i] = Executors.newSingleThreadExecutor();
- }
}
@Override
@@ -207,7 +205,7 @@ public class ContainerStateMachine extends BaseStateMachine {
throws IOException {
final ContainerCommandRequestProto proto =
getRequestProto(request.getMessage().getContent());
-
+ Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
final StateMachineLogEntryProto log;
if (proto.getCmdType() == Type.WriteChunk) {
final WriteChunkRequestProto write = proto.getWriteChunk();
@@ -557,8 +555,5 @@ public class ContainerStateMachine extends BaseStateMachine {
@Override
public void close() throws IOException {
- for (int i = 0; i < numExecutors; i++) {
- executors[i].shutdown();
- }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index b5092d9..599f821 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -76,6 +76,8 @@ import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -94,11 +96,12 @@ public final class XceiverServerRatis implements XceiverServerSpi {
private final int port;
private final RaftServer server;
private ThreadPoolExecutor chunkExecutor;
+ private final List<ExecutorService> executors;
+ private final ContainerDispatcher dispatcher;
private ClientId clientId = ClientId.randomId();
private final StateContext context;
private final ReplicationLevel replicationLevel;
private long nodeFailureTimeoutMs;
- private ContainerStateMachine stateMachine;
private XceiverServerRatis(DatanodeDetails dd, int port,
ContainerDispatcher dispatcher, Configuration conf, StateContext context)
@@ -121,18 +124,22 @@ public final class XceiverServerRatis implements XceiverServerSpi {
this.replicationLevel =
conf.getEnum(OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT);
- stateMachine = new ContainerStateMachine(dispatcher, chunkExecutor, this,
- numContainerOpExecutors);
+ this.executors = new ArrayList<>();
+ this.dispatcher = dispatcher;
+ for (int i = 0; i < numContainerOpExecutors; i++) {
+ executors.add(Executors.newSingleThreadExecutor());
+ }
+
this.server = RaftServer.newBuilder()
.setServerId(RatisHelper.toRaftPeerId(dd))
.setProperties(serverProperties)
- .setStateMachine(stateMachine)
+ .setStateMachineRegistry(this::getStateMachine)
.build();
}
- @VisibleForTesting
- public ContainerStateMachine getStateMachine() {
- return stateMachine;
+ private ContainerStateMachine getStateMachine(RaftGroupId gid) {
+ return new ContainerStateMachine(gid, dispatcher, chunkExecutor,
+ this, Collections.unmodifiableList(executors));
}
private RaftProperties newRaftProperties(Configuration conf) {
@@ -310,8 +317,11 @@ public final class XceiverServerRatis implements XceiverServerSpi {
@Override
public void stop() {
try {
- chunkExecutor.shutdown();
+ // shutdown server before the executors as while shutting down,
+ // some of the tasks would be executed using the executors.
server.close();
+ chunkExecutor.shutdown();
+ executors.forEach(ExecutorService::shutdown);
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-hdds/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/pom.xml b/hadoop-hdds/pom.xml
index bedf78d..f960e90 100644
--- a/hadoop-hdds/pom.xml
+++ b/hadoop-hdds/pom.xml
@@ -45,7 +45,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<hdds.version>0.4.0-SNAPSHOT</hdds.version>
<!-- Apache Ratis version -->
- <ratis.version>0.3.0-aa38160-SNAPSHOT</ratis.version>
+ <ratis.version>0.3.0-2272086-SNAPSHOT</ratis.version>
<bouncycastle.version>1.60</bouncycastle.version>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
index 1666b7c..0f980dc1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -523,9 +523,13 @@ public class SCMContainerManager implements ContainerManager {
try {
containerStateManager.updateContainerReplica(id, replica);
ContainerInfo currentInfo = containerStateManager.getContainer(id);
- if (newInfo.getState() == LifeCycleState.CLOSING
- && currentInfo.getState() == LifeCycleState.CLOSED) {
+ if (newInfo.getState() == LifeCycleState.CLOSED
+ && currentInfo.getState() == LifeCycleState.CLOSING) {
currentInfo = updateContainerStateInternal(id, LifeCycleEvent.CLOSE);
+ if (!currentInfo.isOpen()) {
+ pipelineManager.removeContainerFromPipeline(
+ currentInfo.getPipelineID(), id);
+ }
}
HddsProtos.SCMContainerInfo newState =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
index 45886c6..9a1c705 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
@@ -118,7 +118,7 @@ public class TestNodeFailure {
pipelineManager.getPipeline(ratisContainer2.getPipeline().getId())
.getPipelineState());
// Now restart the datanode and make sure that a new pipeline is created.
- cluster.restartHddsDatanode(dnToFail);
+ cluster.restartHddsDatanode(dnToFail, true);
ContainerWithPipeline ratisContainer3 =
containerManager.allocateContainer(RATIS, THREE, "testOwner");
//Assert that new container is not created from the ratis 2 pipeline
http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index d13efb4..3aad7f7 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -156,16 +156,16 @@ public interface MiniOzoneCluster {
*
* @param i index of HddsDatanode in the MiniOzoneCluster
*/
- void restartHddsDatanode(int i) throws InterruptedException,
- TimeoutException;
+ void restartHddsDatanode(int i, boolean waitForDatanode)
+ throws InterruptedException, TimeoutException;
/**
* Restart a particular HddsDatanode.
*
* @param dn HddsDatanode in the MiniOzoneCluster
*/
- void restartHddsDatanode(DatanodeDetails dn) throws InterruptedException,
- TimeoutException, IOException;
+ void restartHddsDatanode(DatanodeDetails dn, boolean waitForDatanode)
+ throws InterruptedException, TimeoutException, IOException;
/**
* Shutdown a particular HddsDatanode.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index ae52451..11bc0e0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -232,8 +232,8 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
}
@Override
- public void restartHddsDatanode(int i) throws InterruptedException,
- TimeoutException {
+ public void restartHddsDatanode(int i, boolean waitForDatanode)
+ throws InterruptedException, TimeoutException {
HddsDatanodeService datanodeService = hddsDatanodes.get(i);
datanodeService.stop();
datanodeService.join();
@@ -248,20 +248,24 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
conf.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort);
conf.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false);
hddsDatanodes.remove(i);
- // wait for node to be removed from SCM healthy node list.
- waitForClusterToBeReady();
+ if (waitForDatanode) {
+ // wait for node to be removed from SCM healthy node list.
+ waitForClusterToBeReady();
+ }
HddsDatanodeService service =
HddsDatanodeService.createHddsDatanodeService(conf);
hddsDatanodes.add(i, service);
service.start(null);
- // wait for the node to be identified as a healthy node again.
- waitForClusterToBeReady();
+ if (waitForDatanode) {
+ // wait for the node to be identified as a healthy node again.
+ waitForClusterToBeReady();
+ }
}
@Override
- public void restartHddsDatanode(DatanodeDetails dn)
+ public void restartHddsDatanode(DatanodeDetails dn, boolean waitForDatanode)
throws InterruptedException, TimeoutException, IOException {
- restartHddsDatanode(getHddsDatanodeIndex(dn));
+ restartHddsDatanode(getHddsDatanodeIndex(dn), waitForDatanode);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java
index ed4629c..98099be 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java
@@ -137,7 +137,7 @@ public class TestBCSID {
omKeyLocationInfo.getBlockCommitSequenceId());
// verify that on restarting the datanode, it reloads the BCSID correctly.
- cluster.restartHddsDatanode(0);
+ cluster.restartHddsDatanode(0, true);
Assert.assertEquals(blockCommitSequenceId,
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index e4cbad5..63346d2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -177,7 +177,7 @@ public class TestBlockDeletion {
// Containers in the DN and SCM should have same delete transactionIds
// after DN restart. The assertion is just to verify that the state of
// containerInfos in dn and scm is consistent after dn restart.
- cluster.restartHddsDatanode(0);
+ cluster.restartHddsDatanode(0, true);
matchContainerTransactionIds();
// verify PENDING_DELETE_STATUS event is fired
@@ -210,7 +210,7 @@ public class TestBlockDeletion {
GenericTestUtils.waitFor(() -> logCapturer.getOutput()
.contains("RetriableDatanodeCommand type=deleteBlocksCommand"),
500, 5000);
- cluster.restartHddsDatanode(0);
+ cluster.restartHddsDatanode(0, true);
}
private void verifyTransactionsCommitted() throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
index 1ecedcc..08905eb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
@@ -326,7 +326,7 @@ public class TestKeys {
private static void restartDatanode(MiniOzoneCluster cluster, int datanodeIdx)
throws Exception {
- cluster.restartHddsDatanode(datanodeIdx);
+ cluster.restartHddsDatanode(datanodeIdx, true);
}
@Test
http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-ozone/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-ozone/pom.xml b/hadoop-ozone/pom.xml
index 5e53134..2fcffab 100644
--- a/hadoop-ozone/pom.xml
+++ b/hadoop-ozone/pom.xml
@@ -33,7 +33,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<hadoop.version>3.2.1-SNAPSHOT</hadoop.version>
<hdds.version>0.4.0-SNAPSHOT</hdds.version>
<ozone.version>0.4.0-SNAPSHOT</ozone.version>
- <ratis.version>0.3.0-aa38160-SNAPSHOT</ratis.version>
+ <ratis.version>0.3.0-2272086-SNAPSHOT</ratis.version>
<bouncycastle.version>1.60</bouncycastle.version>
<ozone.release>Badlands</ozone.release>
<declared.ozone.version>${ozone.version}</declared.ozone.version>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java
new file mode 100644
index 0000000..44f6f1d
--- /dev/null
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.freon;
+
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.container.common.transport
+ .server.XceiverServerSpi;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis
+ .XceiverServerRatis;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests Freon with Datanode restarts without waiting for pipeline to close.
+ */
+public class TestFreonWithDatanodeFastRestart {
+
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration conf;
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ * <p>
+ * Ozone is made active by setting OZONE_ENABLED = true
+ *
+ */
+ @BeforeClass
+ public static void init() throws Exception {
+ conf = new OzoneConfiguration();
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setHbProcessorInterval(1000)
+ .setHbInterval(1000)
+ .setNumDatanodes(3)
+ .build();
+ cluster.waitForClusterToBeReady();
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @AfterClass
+ public static void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testRestart() throws Exception {
+ startFreon();
+ StateMachine sm = getStateMachine();
+ TermIndex termIndexBeforeRestart = sm.getLastAppliedTermIndex();
+ cluster.restartHddsDatanode(0, false);
+ sm = getStateMachine();
+ SimpleStateMachineStorage storage =
+ (SimpleStateMachineStorage)sm.getStateMachineStorage();
+ SingleFileSnapshotInfo snapshotInfo = storage.getLatestSnapshot();
+ TermIndex termInSnapshot = snapshotInfo.getTermIndex();
+ String expectedSnapFile =
+ storage.getSnapshotFile(termIndexBeforeRestart.getTerm(),
+ termIndexBeforeRestart.getIndex()).getAbsolutePath();
+ Assert.assertEquals(snapshotInfo.getFile().getPath().toString(),
+ expectedSnapFile);
+ Assert.assertEquals(termInSnapshot, termIndexBeforeRestart);
+
+ // After restart the term index might have progressed to apply pending
+ // transactions.
+ TermIndex termIndexAfterRestart = sm.getLastAppliedTermIndex();
+ Assert.assertTrue(termIndexAfterRestart.getIndex() >=
+ termIndexBeforeRestart.getIndex());
+ startFreon();
+ }
+
+ private void startFreon() throws Exception {
+ RandomKeyGenerator randomKeyGenerator =
+ new RandomKeyGenerator((OzoneConfiguration) cluster.getConf());
+ randomKeyGenerator.setNumOfVolumes(1);
+ randomKeyGenerator.setNumOfBuckets(1);
+ randomKeyGenerator.setNumOfKeys(1);
+ randomKeyGenerator.setType(ReplicationType.RATIS);
+ randomKeyGenerator.setFactor(ReplicationFactor.THREE);
+ randomKeyGenerator.setKeySize(20971520);
+ randomKeyGenerator.setValidateWrites(true);
+ randomKeyGenerator.call();
+ Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated());
+ Assert.assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated());
+ Assert.assertEquals(1, randomKeyGenerator.getNumberOfKeysAdded());
+ Assert.assertEquals(0, randomKeyGenerator.getUnsuccessfulValidationCount());
+ }
+
+ private StateMachine getStateMachine() throws Exception {
+ XceiverServerSpi server =
+ cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().
+ getContainer().getServer(HddsProtos.ReplicationType.RATIS);
+ RaftServerProxy proxy =
+ (RaftServerProxy)(((XceiverServerRatis)server).getServer());
+ RaftGroupId groupId = proxy.getGroupIds().iterator().next();
+ RaftServerImpl impl = proxy.getImpl(groupId);
+ return impl.getStateMachine();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java
index a1c50b6..7cb53d3 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java
@@ -18,17 +18,11 @@
package org.apache.hadoop.ozone.freon;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
-import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
-import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -36,7 +30,10 @@ import org.junit.Test;
import java.util.concurrent.TimeUnit;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_SCM_STALENODE_INTERVAL;
/**
* Tests Freon with Datanode restarts.
@@ -56,6 +53,12 @@ public class TestFreonWithDatanodeRestart {
public static void init() throws Exception {
conf = new OzoneConfiguration();
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 5, TimeUnit.SECONDS);
+ conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
+ TimeUnit.SECONDS);
+ conf.setTimeDuration(HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL, 1,
+ TimeUnit.SECONDS);
+ conf.setTimeDuration(OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT, 5,
+ TimeUnit.SECONDS);
cluster = MiniOzoneCluster.newBuilder(conf)
.setHbProcessorInterval(1000)
.setHbInterval(1000)
@@ -76,6 +79,12 @@ public class TestFreonWithDatanodeRestart {
@Test
public void testRestart() throws Exception {
+ startFreon();
+ cluster.restartHddsDatanode(0, true);
+ startFreon();
+ }
+
+ private void startFreon() throws Exception {
RandomKeyGenerator randomKeyGenerator =
new RandomKeyGenerator((OzoneConfiguration) cluster.getConf());
randomKeyGenerator.setNumOfVolumes(1);
@@ -90,33 +99,5 @@ public class TestFreonWithDatanodeRestart {
Assert.assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated());
Assert.assertEquals(1, randomKeyGenerator.getNumberOfKeysAdded());
Assert.assertEquals(0, randomKeyGenerator.getUnsuccessfulValidationCount());
-
- ContainerStateMachine sm = getStateMachine();
- TermIndex termIndexBeforeRestart = sm.getLastAppliedTermIndex();
- cluster.restartHddsDatanode(0);
- sm = getStateMachine();
- SimpleStateMachineStorage storage =
- (SimpleStateMachineStorage)sm.getStateMachineStorage();
- SingleFileSnapshotInfo snapshotInfo = storage.getLatestSnapshot();
- TermIndex termInSnapshot = snapshotInfo.getTermIndex();
- String expectedSnapFile =
- storage.getSnapshotFile(termIndexBeforeRestart.getTerm(),
- termIndexBeforeRestart.getIndex()).getAbsolutePath();
- Assert.assertEquals(snapshotInfo.getFile().getPath().toString(),
- expectedSnapFile);
- Assert.assertEquals(termInSnapshot, termIndexBeforeRestart);
-
- // After restart the term index might have progressed to apply pending
- // transactions.
- TermIndex termIndexAfterRestart = sm.getLastAppliedTermIndex();
- Assert.assertTrue(termIndexAfterRestart.getIndex() >=
- termIndexBeforeRestart.getIndex());
- }
-
- private ContainerStateMachine getStateMachine() {
- XceiverServerSpi server =
- cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().
- getContainer().getServer(HddsProtos.ReplicationType.RATIS);
- return ((XceiverServerRatis)server).getStateMachine();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org