You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/09/07 18:20:38 UTC
hadoop git commit: HDDS-400. Check global replication state for
containers of dead node. Contributed by Elek, Marton.
Repository: hadoop
Updated Branches:
refs/heads/trunk ff64d3571 -> ab90248b3
HDDS-400. Check global replication state for containers of dead node. Contributed by Elek, Marton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ab90248b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ab90248b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ab90248b
Branch: refs/heads/trunk
Commit: ab90248b30c2355cd8ae6660ea8af9758f95356a
Parents: ff64d35
Author: Hanisha Koneru <ha...@apache.org>
Authored: Fri Sep 7 11:20:25 2018 -0700
Committer: Hanisha Koneru <ha...@apache.org>
Committed: Fri Sep 7 11:20:25 2018 -0700
----------------------------------------------------------------------
.../scm/container/ContainerReportHandler.java | 49 +--
.../scm/container/ContainerStateManager.java | 38 +-
.../hadoop/hdds/scm/node/DeadNodeHandler.java | 12 +
.../org/apache/hadoop/hdds/scm/TestUtils.java | 45 ++
.../container/TestContainerReportHandler.java | 20 +-
.../container/TestContainerStateManager.java | 96 +++++
.../hdds/scm/node/TestDeadNodeHandler.java | 95 +++--
.../container/TestContainerStateManager.java | 415 -------------------
.../TestContainerStateManagerIntegration.java | 415 +++++++++++++++++++
9 files changed, 694 insertions(+), 491 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
index 5ca2bcb..dcbd49c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hdds.scm.container;
import java.io.IOException;
@@ -23,18 +22,16 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.replication
- .ReplicationActivityStatus;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
import org.apache.hadoop.hdds.scm.node.states.ReportResult;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
- .ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -59,22 +56,21 @@ public class ContainerReportHandler implements
private ReplicationActivityStatus replicationStatus;
-
public ContainerReportHandler(Mapping containerMapping,
Node2ContainerMap node2ContainerMap,
ReplicationActivityStatus replicationActivityStatus) {
Preconditions.checkNotNull(containerMapping);
Preconditions.checkNotNull(node2ContainerMap);
Preconditions.checkNotNull(replicationActivityStatus);
+ this.containerStateManager = containerMapping.getStateManager();
this.containerMapping = containerMapping;
this.node2ContainerMap = node2ContainerMap;
- this.containerStateManager = containerMapping.getStateManager();
this.replicationStatus = replicationActivityStatus;
}
@Override
public void onMessage(ContainerReportFromDatanode containerReportFromDatanode,
- EventPublisher publisher) {
+ EventPublisher publisher) {
DatanodeDetails datanodeOrigin =
containerReportFromDatanode.getDatanodeDetails();
@@ -88,7 +84,8 @@ public class ContainerReportHandler implements
.processContainerReports(datanodeOrigin, containerReport, false);
Set<ContainerID> containerIds = containerReport.getReportsList().stream()
- .map(containerProto -> containerProto.getContainerID())
+ .map(StorageContainerDatanodeProtocolProtos
+ .ContainerInfo::getContainerID)
.map(ContainerID::new)
.collect(Collectors.toSet());
@@ -102,13 +99,12 @@ public class ContainerReportHandler implements
for (ContainerID containerID : reportResult.getMissingContainers()) {
containerStateManager
.removeContainerReplica(containerID, datanodeOrigin);
- emitReplicationRequestEvent(containerID, publisher);
+ checkReplicationState(containerID, publisher);
}
for (ContainerID containerID : reportResult.getNewContainers()) {
containerStateManager.addContainerReplica(containerID, datanodeOrigin);
-
- emitReplicationRequestEvent(containerID, publisher);
+ checkReplicationState(containerID, publisher);
}
} catch (IOException e) {
@@ -119,8 +115,9 @@ public class ContainerReportHandler implements
}
- private void emitReplicationRequestEvent(ContainerID containerID,
- EventPublisher publisher) throws SCMException {
+ private void checkReplicationState(ContainerID containerID,
+ EventPublisher publisher)
+ throws SCMException {
ContainerInfo container = containerStateManager.getContainer(containerID);
if (container == null) {
@@ -134,18 +131,18 @@ public class ContainerReportHandler implements
if (container.isContainerOpen()) {
return;
}
- if (replicationStatus.isReplicationEnabled()) {
-
- int existingReplicas =
- containerStateManager.getContainerReplicas(containerID).size();
-
- int expectedReplicas = container.getReplicationFactor().getNumber();
-
- if (existingReplicas != expectedReplicas) {
+ ReplicationRequest replicationState =
+ containerStateManager.checkReplicationState(containerID);
+ if (replicationState != null) {
+ if (replicationStatus.isReplicationEnabled()) {
publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
- new ReplicationRequest(containerID.getId(), existingReplicas,
- container.getReplicationFactor().getNumber()));
+ replicationState);
+ } else {
+ LOG.warn(
+ "Over/under replicated container but the replication is not "
+ + "(yet) enabled: "
+ + replicationState.toString());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 421d34e..eb8f2e3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.container;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -27,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.container.states.ContainerState;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
@@ -40,6 +42,7 @@ import org.apache.hadoop.ozone.common.statemachine
.InvalidStateTransitionException;
import org.apache.hadoop.ozone.common.statemachine.StateMachine;
import org.apache.hadoop.util.Time;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -148,7 +151,7 @@ public class ContainerStateManager implements Closeable {
finalStates);
initializeStateMachine();
- this.containerSize =(long)configuration.getStorageSize(
+ this.containerSize = (long) configuration.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
StorageUnit.BYTES);
@@ -399,7 +402,7 @@ public class ContainerStateManager implements Closeable {
// container ID.
ContainerState key = new ContainerState(owner, type, factor);
ContainerID lastID = lastUsedMap.get(key);
- if(lastID == null) {
+ if (lastID == null) {
lastID = matchingSet.first();
}
@@ -426,7 +429,7 @@ public class ContainerStateManager implements Closeable {
selectedContainer = findContainerWithSpace(size, resultSet, owner);
}
// Update the allocated Bytes on this container.
- if(selectedContainer != null) {
+ if (selectedContainer != null) {
selectedContainer.updateAllocatedBytes(size);
}
return selectedContainer;
@@ -539,9 +542,36 @@ public class ContainerStateManager implements Closeable {
DatanodeDetails dn) throws SCMException {
return containers.removeContainerReplica(containerID, dn);
}
-
+
+ /**
+ * Compare the existing replication number with the expected one.
+ */
+ public ReplicationRequest checkReplicationState(ContainerID containerID)
+ throws SCMException {
+ int existingReplicas = getContainerReplicas(containerID).size();
+ int expectedReplicas = getContainer(containerID)
+ .getReplicationFactor().getNumber();
+ if (existingReplicas != expectedReplicas) {
+ return new ReplicationRequest(containerID.getId(), existingReplicas,
+ expectedReplicas);
+ }
+ return null;
+ }
+
+ /**
+ * Checks if the container is open.
+ */
+ public boolean isOpen(ContainerID containerID) {
+ Preconditions.checkNotNull(containerID);
+ ContainerInfo container = Preconditions
+ .checkNotNull(getContainer(containerID),
+ "Container can't be found " + containerID);
+ return container.isContainerOpen();
+ }
+
@VisibleForTesting
public ContainerStateMap getContainerStateMap() {
return containers;
}
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
index c853b3b..d694a10 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
@@ -23,6 +23,8 @@ import java.util.Set;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
import org.apache.hadoop.hdds.server.events.EventHandler;
@@ -62,6 +64,16 @@ public class DeadNodeHandler implements EventHandler<DatanodeDetails> {
try {
containerStateManager.removeContainerReplica(container,
datanodeDetails);
+
+ if (!containerStateManager.isOpen(container)) {
+ ReplicationRequest replicationRequest =
+ containerStateManager.checkReplicationState(container);
+
+ if (replicationRequest != null) {
+ publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+ replicationRequest);
+ }
+ }
} catch (SCMException e) {
LOG.error("Can't remove container from containerStateMap {}", container
.getId(), e);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index d617680..7af9dda 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -17,6 +17,11 @@
package org.apache.hadoop.hdds.scm;
import com.google.common.base.Preconditions;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
import org.apache.hadoop.hdds.protocol.proto
@@ -31,12 +36,18 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.StorageTypeProto;
+import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -376,5 +387,39 @@ public final class TestUtils {
return report.build();
}
+ public static
+ org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo
+ allocateContainer(ContainerStateManager containerStateManager)
+ throws IOException {
+
+ PipelineSelector pipelineSelector = Mockito.mock(PipelineSelector.class);
+
+ Pipeline pipeline = new Pipeline("leader", HddsProtos.LifeCycleState.CLOSED,
+ HddsProtos.ReplicationType.STAND_ALONE,
+ HddsProtos.ReplicationFactor.THREE,
+ PipelineID.randomId());
+ when(pipelineSelector
+ .getReplicationPipeline(HddsProtos.ReplicationType.STAND_ALONE,
+ HddsProtos.ReplicationFactor.THREE)).thenReturn(pipeline);
+
+ return containerStateManager
+ .allocateContainer(pipelineSelector,
+ HddsProtos.ReplicationType.STAND_ALONE,
+ HddsProtos.ReplicationFactor.THREE, "root").getContainerInfo();
+
+ }
+
+ public static void closeContainer(ContainerStateManager containerStateManager,
+ org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo
+ container)
+ throws SCMException {
+
+ containerStateManager.getContainerStateMap()
+ .updateState(container, container.getState(), LifeCycleState.CLOSING);
+
+ containerStateManager.getContainerStateMap()
+ .updateState(container, container.getState(), LifeCycleState.CLOSED);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index 66f0966..d74a32f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -71,9 +71,7 @@ public class TestContainerReportHandler implements EventPublisher {
@Test
public void test() throws IOException {
-
- //given
-
+ //GIVEN
OzoneConfiguration conf = new OzoneConfiguration();
Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
Mapping mapping = Mockito.mock(Mapping.class);
@@ -133,19 +131,9 @@ public class TestContainerReportHandler implements EventPublisher {
long c3 = cont3.getContainerID();
// Close remaining containers
- try {
- containerStateManager.getContainerStateMap()
- .updateState(cont1, cont1.getState(), LifeCycleState.CLOSING);
- containerStateManager.getContainerStateMap()
- .updateState(cont1, cont1.getState(), LifeCycleState.CLOSED);
- containerStateManager.getContainerStateMap()
- .updateState(cont2, cont2.getState(), LifeCycleState.CLOSING);
- containerStateManager.getContainerStateMap()
- .updateState(cont2, cont2.getState(), LifeCycleState.CLOSED);
-
- } catch (IOException e) {
- LOG.info("Failed to change state of open containers.", e);
- }
+ TestUtils.closeContainer(containerStateManager, cont1);
+ TestUtils.closeContainer(containerStateManager, cont2);
+
//when
//initial reports before replication is enabled. 2 containers w 3 replicas.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
new file mode 100644
index 0000000..fe92ee5
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Testing ContainerStatemanager.
+ */
+public class TestContainerStateManager {
+
+ private ContainerStateManager containerStateManager;
+
+ @Before
+ public void init() throws IOException {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ Mapping mapping = Mockito.mock(Mapping.class);
+ containerStateManager = new ContainerStateManager(conf, mapping);
+
+ }
+
+ @Test
+ public void checkReplicationStateOK() throws IOException {
+ //GIVEN
+ ContainerInfo c1 = TestUtils.allocateContainer(containerStateManager);
+
+ DatanodeDetails d1 = TestUtils.randomDatanodeDetails();
+ DatanodeDetails d2 = TestUtils.randomDatanodeDetails();
+ DatanodeDetails d3 = TestUtils.randomDatanodeDetails();
+
+ addReplica(c1, d1);
+ addReplica(c1, d2);
+ addReplica(c1, d3);
+
+ //WHEN
+ ReplicationRequest replicationRequest = containerStateManager
+ .checkReplicationState(new ContainerID(c1.getContainerID()));
+
+ //THEN
+ Assert.assertNull(replicationRequest);
+ }
+
+ @Test
+ public void checkReplicationStateMissingReplica() throws IOException {
+ //GIVEN
+
+ ContainerInfo c1 = TestUtils.allocateContainer(containerStateManager);
+
+ DatanodeDetails d1 = TestUtils.randomDatanodeDetails();
+ DatanodeDetails d2 = TestUtils.randomDatanodeDetails();
+
+ addReplica(c1, d1);
+ addReplica(c1, d2);
+
+ //WHEN
+ ReplicationRequest replicationRequest = containerStateManager
+ .checkReplicationState(new ContainerID(c1.getContainerID()));
+
+ Assert
+ .assertEquals(c1.getContainerID(), replicationRequest.getContainerId());
+ Assert.assertEquals(2, replicationRequest.getReplicationCount());
+ Assert.assertEquals(3, replicationRequest.getExpecReplicationCount());
+ }
+
+ private void addReplica(ContainerInfo c1, DatanodeDetails d1) {
+ containerStateManager
+ .addContainerReplica(new ContainerID(c1.getContainerID()), d1);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 4be10e1..0b69f5f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -18,76 +18,76 @@
package org.apache.hadoop.hdds.scm.node;
-import java.util.HashSet;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerInfo;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import static org.mockito.Matchers.eq;
import org.mockito.Mockito;
/**
* Test DeadNodeHandler.
*/
public class TestDeadNodeHandler {
+
+ private List<ReplicationRequest> sentEvents = new ArrayList<>();
+
@Test
- public void testOnMessage() throws SCMException {
+ public void testOnMessage() throws IOException {
//GIVEN
DatanodeDetails datanode1 = TestUtils.randomDatanodeDetails();
DatanodeDetails datanode2 = TestUtils.randomDatanodeDetails();
- ContainerInfo container1 = TestUtils.getRandomContainerInfo(1);
- ContainerInfo container2 = TestUtils.getRandomContainerInfo(2);
- ContainerInfo container3 = TestUtils.getRandomContainerInfo(3);
-
Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
ContainerStateManager containerStateManager = new ContainerStateManager(
new OzoneConfiguration(),
Mockito.mock(Mapping.class)
);
+
+ ContainerInfo container1 =
+ TestUtils.allocateContainer(containerStateManager);
+ ContainerInfo container2 =
+ TestUtils.allocateContainer(containerStateManager);
+ ContainerInfo container3 =
+ TestUtils.allocateContainer(containerStateManager);
+
DeadNodeHandler handler =
new DeadNodeHandler(node2ContainerMap, containerStateManager);
- node2ContainerMap
- .insertNewDatanode(datanode1.getUuid(), new HashSet<ContainerID>() {{
- add(new ContainerID(container1.getContainerID()));
- add(new ContainerID(container2.getContainerID()));
- }});
+ registerReplicas(node2ContainerMap, datanode1, container1, container2);
+ registerReplicas(node2ContainerMap, datanode2, container1, container3);
- node2ContainerMap
- .insertNewDatanode(datanode2.getUuid(), new HashSet<ContainerID>() {{
- add(new ContainerID(container1.getContainerID()));
- add(new ContainerID(container3.getContainerID()));
- }});
+ registerReplicas(containerStateManager, container1, datanode1, datanode2);
+ registerReplicas(containerStateManager, container2, datanode1);
+ registerReplicas(containerStateManager, container3, datanode2);
- containerStateManager.getContainerStateMap()
- .addContainerReplica(new ContainerID(container1.getContainerID()),
- datanode1, datanode2);
+ TestUtils.closeContainer(containerStateManager, container1);
- containerStateManager.getContainerStateMap()
- .addContainerReplica(new ContainerID(container2.getContainerID()),
- datanode1);
-
- containerStateManager.getContainerStateMap()
- .addContainerReplica(new ContainerID(container3.getContainerID()),
- datanode2);
+ EventPublisher publisher = Mockito.mock(EventPublisher.class);
//WHEN datanode1 is dead
- handler.onMessage(datanode1, Mockito.mock(EventPublisher.class));
+ handler.onMessage(datanode1, publisher);
//THEN
-
//node2ContainerMap has not been changed
Assert.assertEquals(2, node2ContainerMap.size());
@@ -108,5 +108,40 @@ public class TestDeadNodeHandler {
Assert.assertEquals(1, container3Replicas.size());
Assert.assertEquals(datanode2, container3Replicas.iterator().next());
+ ArgumentCaptor<ReplicationRequest> replicationRequestParameter =
+ ArgumentCaptor.forClass(ReplicationRequest.class);
+
+ Mockito.verify(publisher)
+ .fireEvent(eq(SCMEvents.REPLICATE_CONTAINER),
+ replicationRequestParameter.capture());
+
+ Assert
+ .assertEquals(container1.getContainerID(),
+ replicationRequestParameter.getValue().getContainerId());
+ Assert
+ .assertEquals(1,
+ replicationRequestParameter.getValue().getReplicationCount());
+ Assert
+ .assertEquals(3,
+ replicationRequestParameter.getValue().getExpecReplicationCount());
+ }
+
+ private void registerReplicas(ContainerStateManager containerStateManager,
+ ContainerInfo container, DatanodeDetails... datanodes) {
+ containerStateManager.getContainerStateMap()
+ .addContainerReplica(new ContainerID(container.getContainerID()),
+ datanodes);
}
+
+ private void registerReplicas(Node2ContainerMap node2ContainerMap,
+ DatanodeDetails datanode,
+ ContainerInfo... containers)
+ throws SCMException {
+ node2ContainerMap
+ .insertNewDatanode(datanode.getUuid(),
+ Arrays.stream(containers)
+ .map(container -> new ContainerID(container.getContainerID()))
+ .collect(Collectors.toSet()));
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
deleted file mode 100644
index 9e209af..0000000
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
+++ /dev/null
@@ -1,415 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container;
-
-import com.google.common.primitives.Longs;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.commons.lang3.RandomUtils;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.test.LambdaTestUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NavigableSet;
-import java.util.Random;
-import org.slf4j.event.Level;
-
-/**
- * Tests for ContainerStateManager.
- */
-public class TestContainerStateManager {
-
- private OzoneConfiguration conf;
- private MiniOzoneCluster cluster;
- private XceiverClientManager xceiverClientManager;
- private StorageContainerManager scm;
- private Mapping scmContainerMapping;
- private ContainerStateManager containerStateManager;
- private String containerOwner = "OZONE";
-
-
- @Before
- public void setup() throws Exception {
- conf = new OzoneConfiguration();
- cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
- cluster.waitForClusterToBeReady();
- xceiverClientManager = new XceiverClientManager(conf);
- scm = cluster.getStorageContainerManager();
- scmContainerMapping = scm.getScmContainerManager();
- containerStateManager = scmContainerMapping.getStateManager();
- }
-
- @After
- public void cleanUp() {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
-
- @Test
- public void testAllocateContainer() throws IOException {
- // Allocate a container and verify the container info
- ContainerWithPipeline container1 = scm.getClientProtocolServer()
- .allocateContainer(
- xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), containerOwner);
- ContainerInfo info = containerStateManager
- .getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
- xceiverClientManager.getType(), xceiverClientManager.getFactor(),
- HddsProtos.LifeCycleState.ALLOCATED);
- Assert.assertEquals(container1.getContainerInfo().getContainerID(),
- info.getContainerID());
- Assert.assertEquals(OzoneConsts.GB * 3, info.getAllocatedBytes());
- Assert.assertEquals(containerOwner, info.getOwner());
- Assert.assertEquals(xceiverClientManager.getType(),
- info.getReplicationType());
- Assert.assertEquals(xceiverClientManager.getFactor(),
- info.getReplicationFactor());
- Assert.assertEquals(HddsProtos.LifeCycleState.ALLOCATED, info.getState());
-
- // Check there are two containers in ALLOCATED state after allocation
- ContainerWithPipeline container2 = scm.getClientProtocolServer()
- .allocateContainer(
- xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), containerOwner);
- int numContainers = containerStateManager
- .getMatchingContainerIDs(containerOwner,
- xceiverClientManager.getType(), xceiverClientManager.getFactor(),
- HddsProtos.LifeCycleState.ALLOCATED).size();
- Assert.assertNotEquals(container1.getContainerInfo().getContainerID(),
- container2.getContainerInfo().getContainerID());
- Assert.assertEquals(2, numContainers);
- }
-
- @Test
- public void testContainerStateManagerRestart() throws IOException {
- // Allocate 5 containers in ALLOCATED state and 5 in CREATING state
-
- List<ContainerInfo> containers = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- ContainerWithPipeline container = scm.getClientProtocolServer()
- .allocateContainer(
- xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), containerOwner);
- containers.add(container.getContainerInfo());
- if (i >= 5) {
- scm.getScmContainerManager().updateContainerState(container
- .getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.CREATE);
- }
- }
-
- // New instance of ContainerStateManager should load all the containers in
- // container store.
- ContainerStateManager stateManager =
- new ContainerStateManager(conf, scmContainerMapping
- );
- int matchCount = stateManager
- .getMatchingContainerIDs(containerOwner,
- xceiverClientManager.getType(), xceiverClientManager.getFactor(),
- HddsProtos.LifeCycleState.ALLOCATED).size();
- Assert.assertEquals(5, matchCount);
- matchCount = stateManager.getMatchingContainerIDs(containerOwner,
- xceiverClientManager.getType(), xceiverClientManager.getFactor(),
- HddsProtos.LifeCycleState.CREATING).size();
- Assert.assertEquals(5, matchCount);
- }
-
- @Test
- public void testGetMatchingContainer() throws IOException {
- ContainerWithPipeline container1 = scm.getClientProtocolServer().
- allocateContainer(xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), containerOwner);
- scmContainerMapping
- .updateContainerState(container1.getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.CREATE);
- scmContainerMapping
- .updateContainerState(container1.getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.CREATED);
-
- ContainerWithPipeline container2 = scm.getClientProtocolServer().
- allocateContainer(xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), containerOwner);
-
- ContainerInfo info = containerStateManager
- .getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
- xceiverClientManager.getType(), xceiverClientManager.getFactor(),
- HddsProtos.LifeCycleState.OPEN);
- Assert.assertEquals(container1.getContainerInfo().getContainerID(),
- info.getContainerID());
-
- info = containerStateManager
- .getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
- xceiverClientManager.getType(), xceiverClientManager.getFactor(),
- HddsProtos.LifeCycleState.ALLOCATED);
- Assert.assertEquals(container2.getContainerInfo().getContainerID(),
- info.getContainerID());
-
- scmContainerMapping
- .updateContainerState(container2.getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.CREATE);
- scmContainerMapping
- .updateContainerState(container2.getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.CREATED);
-
- // space has already been allocated in container1, now container 2 should
- // be chosen.
- info = containerStateManager
- .getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
- xceiverClientManager.getType(), xceiverClientManager.getFactor(),
- HddsProtos.LifeCycleState.OPEN);
- Assert.assertEquals(container2.getContainerInfo().getContainerID(),
- info.getContainerID());
- }
-
- @Test
- public void testUpdateContainerState() throws IOException {
- NavigableSet<ContainerID> containerList = containerStateManager
- .getMatchingContainerIDs(containerOwner,
- xceiverClientManager.getType(), xceiverClientManager.getFactor(),
- HddsProtos.LifeCycleState.ALLOCATED);
- int containers = containerList == null ? 0 : containerList.size();
- Assert.assertEquals(0, containers);
-
- // Allocate container1 and update its state from ALLOCATED -> CREATING ->
- // OPEN -> CLOSING -> CLOSED -> DELETING -> DELETED
- ContainerWithPipeline container1 = scm.getClientProtocolServer()
- .allocateContainer(
- xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), containerOwner);
- containers = containerStateManager.getMatchingContainerIDs(containerOwner,
- xceiverClientManager.getType(), xceiverClientManager.getFactor(),
- HddsProtos.LifeCycleState.ALLOCATED).size();
- Assert.assertEquals(1, containers);
-
- scmContainerMapping
- .updateContainerState(container1.getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.CREATE);
- containers = containerStateManager.getMatchingContainerIDs(containerOwner,
- xceiverClientManager.getType(), xceiverClientManager.getFactor(),
- HddsProtos.LifeCycleState.CREATING).size();
- Assert.assertEquals(1, containers);
-
- scmContainerMapping
- .updateContainerState(container1.getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.CREATED);
- containers = containerStateManager.getMatchingContainerIDs(containerOwner,
- xceiverClientManager.getType(), xceiverClientManager.getFactor(),
- HddsProtos.LifeCycleState.OPEN).size();
- Assert.assertEquals(1, containers);
-
- scmContainerMapping
- .updateContainerState(container1.getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.FINALIZE);
- containers = containerStateManager.getMatchingContainerIDs(containerOwner,
- xceiverClientManager.getType(), xceiverClientManager.getFactor(),
- HddsProtos.LifeCycleState.CLOSING).size();
- Assert.assertEquals(1, containers);
-
- scmContainerMapping
- .updateContainerState(container1.getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.CLOSE);
- containers = containerStateManager.getMatchingContainerIDs(containerOwner,
- xceiverClientManager.getType(), xceiverClientManager.getFactor(),
- HddsProtos.LifeCycleState.CLOSED).size();
- Assert.assertEquals(1, containers);
-
- scmContainerMapping
- .updateContainerState(container1.getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.DELETE);
- containers = containerStateManager.getMatchingContainerIDs(containerOwner,
- xceiverClientManager.getType(), xceiverClientManager.getFactor(),
- HddsProtos.LifeCycleState.DELETING).size();
- Assert.assertEquals(1, containers);
-
- scmContainerMapping
- .updateContainerState(container1.getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.CLEANUP);
- containers = containerStateManager.getMatchingContainerIDs(containerOwner,
- xceiverClientManager.getType(), xceiverClientManager.getFactor(),
- HddsProtos.LifeCycleState.DELETED).size();
- Assert.assertEquals(1, containers);
-
- // Allocate container1 and update its state from ALLOCATED -> CREATING ->
- // DELETING
- ContainerWithPipeline container2 = scm.getClientProtocolServer()
- .allocateContainer(
- xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), containerOwner);
- scmContainerMapping
- .updateContainerState(container2.getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.CREATE);
- scmContainerMapping
- .updateContainerState(container2.getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.TIMEOUT);
- containers = containerStateManager.getMatchingContainerIDs(containerOwner,
- xceiverClientManager.getType(), xceiverClientManager.getFactor(),
- HddsProtos.LifeCycleState.DELETING).size();
- Assert.assertEquals(1, containers);
-
- // Allocate container1 and update its state from ALLOCATED -> CREATING ->
- // OPEN -> CLOSING -> CLOSED
- ContainerWithPipeline container3 = scm.getClientProtocolServer()
- .allocateContainer(
- xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), containerOwner);
- scmContainerMapping
- .updateContainerState(container3.getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.CREATE);
- scmContainerMapping
- .updateContainerState(container3.getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.CREATED);
- scmContainerMapping
- .updateContainerState(container3.getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.FINALIZE);
- scmContainerMapping
- .updateContainerState(container3.getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.CLOSE);
- containers = containerStateManager.getMatchingContainerIDs(containerOwner,
- xceiverClientManager.getType(), xceiverClientManager.getFactor(),
- HddsProtos.LifeCycleState.CLOSED).size();
- Assert.assertEquals(1, containers);
- }
-
- @Test
- public void testUpdatingAllocatedBytes() throws Exception {
- ContainerWithPipeline container1 = scm.getClientProtocolServer()
- .allocateContainer(xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), containerOwner);
- scmContainerMapping.updateContainerState(container1
- .getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.CREATE);
- scmContainerMapping.updateContainerState(container1
- .getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.CREATED);
-
- Random ran = new Random();
- long allocatedSize = 0;
- for (int i = 0; i<5; i++) {
- long size = Math.abs(ran.nextLong() % OzoneConsts.GB);
- allocatedSize += size;
- // trigger allocating bytes by calling getMatchingContainer
- ContainerInfo info = containerStateManager
- .getMatchingContainer(size, containerOwner,
- xceiverClientManager.getType(), xceiverClientManager.getFactor(),
- HddsProtos.LifeCycleState.OPEN);
- Assert.assertEquals(container1.getContainerInfo().getContainerID(),
- info.getContainerID());
-
- ContainerMapping containerMapping =
- (ContainerMapping) scmContainerMapping;
- // manually trigger a flush, this will persist the allocated bytes value
- // to disk
- containerMapping.flushContainerInfo();
-
- // the persisted value should always be equal to allocated size.
- byte[] containerBytes = containerMapping.getContainerStore().get(
- Longs.toByteArray(container1.getContainerInfo().getContainerID()));
- HddsProtos.SCMContainerInfo infoProto =
- HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
- ContainerInfo currentInfo = ContainerInfo.fromProtobuf(infoProto);
- Assert.assertEquals(allocatedSize, currentInfo.getAllocatedBytes());
- }
- }
-
- @Test
- public void testReplicaMap() throws Exception {
- GenericTestUtils.setLogLevel(ContainerStateMap.getLOG(), Level.DEBUG);
- GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
- .captureLogs(ContainerStateMap.getLOG());
- DatanodeDetails dn1 = DatanodeDetails.newBuilder().setHostName("host1")
- .setIpAddress("1.1.1.1")
- .setUuid(UUID.randomUUID().toString()).build();
- DatanodeDetails dn2 = DatanodeDetails.newBuilder().setHostName("host2")
- .setIpAddress("2.2.2.2")
- .setUuid(UUID.randomUUID().toString()).build();
-
- // Test 1: no replica's exist
- ContainerID containerID = ContainerID.valueof(RandomUtils.nextLong());
- Set<DatanodeDetails> replicaSet;
- LambdaTestUtils.intercept(SCMException.class, "", () -> {
- containerStateManager.getContainerReplicas(containerID);
- });
-
- // Test 2: Add replica nodes and then test
- containerStateManager.addContainerReplica(containerID, dn1);
- containerStateManager.addContainerReplica(containerID, dn2);
- replicaSet = containerStateManager.getContainerReplicas(containerID);
- Assert.assertEquals(2, replicaSet.size());
- Assert.assertTrue(replicaSet.contains(dn1));
- Assert.assertTrue(replicaSet.contains(dn2));
-
- // Test 3: Remove one replica node and then test
- containerStateManager.removeContainerReplica(containerID, dn1);
- replicaSet = containerStateManager.getContainerReplicas(containerID);
- Assert.assertEquals(1, replicaSet.size());
- Assert.assertFalse(replicaSet.contains(dn1));
- Assert.assertTrue(replicaSet.contains(dn2));
-
- // Test 3: Remove second replica node and then test
- containerStateManager.removeContainerReplica(containerID, dn2);
- replicaSet = containerStateManager.getContainerReplicas(containerID);
- Assert.assertEquals(0, replicaSet.size());
- Assert.assertFalse(replicaSet.contains(dn1));
- Assert.assertFalse(replicaSet.contains(dn2));
-
- // Test 4: Re-insert dn1
- containerStateManager.addContainerReplica(containerID, dn1);
- replicaSet = containerStateManager.getContainerReplicas(containerID);
- Assert.assertEquals(1, replicaSet.size());
- Assert.assertTrue(replicaSet.contains(dn1));
- Assert.assertFalse(replicaSet.contains(dn2));
-
- // Re-insert dn2
- containerStateManager.addContainerReplica(containerID, dn2);
- replicaSet = containerStateManager.getContainerReplicas(containerID);
- Assert.assertEquals(2, replicaSet.size());
- Assert.assertTrue(replicaSet.contains(dn1));
- Assert.assertTrue(replicaSet.contains(dn2));
-
- Assert.assertFalse(logCapturer.getOutput().contains(
- "ReplicaMap already contains entry for container Id: " + containerID
- .toString() + ",DataNode: " + dn1.toString()));
- // Re-insert dn1
- containerStateManager.addContainerReplica(containerID, dn1);
- replicaSet = containerStateManager.getContainerReplicas(containerID);
- Assert.assertEquals(2, replicaSet.size());
- Assert.assertTrue(replicaSet.contains(dn1));
- Assert.assertTrue(replicaSet.contains(dn2));
- Assert.assertTrue(logCapturer.getOutput().contains(
- "ReplicaMap already contains entry for container Id: " + containerID
- .toString() + ",DataNode: " + dn1.toString()));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
new file mode 100644
index 0000000..c6e819b
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.primitives.Longs;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Random;
+import org.slf4j.event.Level;
+
+/**
+ * Tests for ContainerStateManager.
+ */
+public class TestContainerStateManagerIntegration {
+
+ private OzoneConfiguration conf;
+ private MiniOzoneCluster cluster;
+ private XceiverClientManager xceiverClientManager;
+ private StorageContainerManager scm;
+ private Mapping scmContainerMapping;
+ private ContainerStateManager containerStateManager;
+ private String containerOwner = "OZONE";
+
+
+ @Before
+ public void setup() throws Exception {
+ conf = new OzoneConfiguration();
+ cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
+ cluster.waitForClusterToBeReady();
+ xceiverClientManager = new XceiverClientManager(conf);
+ scm = cluster.getStorageContainerManager();
+ scmContainerMapping = scm.getScmContainerManager();
+ containerStateManager = scmContainerMapping.getStateManager();
+ }
+
+ @After
+ public void cleanUp() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testAllocateContainer() throws IOException {
+ // Allocate a container and verify the container info
+ ContainerWithPipeline container1 = scm.getClientProtocolServer()
+ .allocateContainer(
+ xceiverClientManager.getType(),
+ xceiverClientManager.getFactor(), containerOwner);
+ ContainerInfo info = containerStateManager
+ .getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
+ xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+ HddsProtos.LifeCycleState.ALLOCATED);
+ Assert.assertEquals(container1.getContainerInfo().getContainerID(),
+ info.getContainerID());
+ Assert.assertEquals(OzoneConsts.GB * 3, info.getAllocatedBytes());
+ Assert.assertEquals(containerOwner, info.getOwner());
+ Assert.assertEquals(xceiverClientManager.getType(),
+ info.getReplicationType());
+ Assert.assertEquals(xceiverClientManager.getFactor(),
+ info.getReplicationFactor());
+ Assert.assertEquals(HddsProtos.LifeCycleState.ALLOCATED, info.getState());
+
+ // Check there are two containers in ALLOCATED state after allocation
+ ContainerWithPipeline container2 = scm.getClientProtocolServer()
+ .allocateContainer(
+ xceiverClientManager.getType(),
+ xceiverClientManager.getFactor(), containerOwner);
+ int numContainers = containerStateManager
+ .getMatchingContainerIDs(containerOwner,
+ xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+ HddsProtos.LifeCycleState.ALLOCATED).size();
+ Assert.assertNotEquals(container1.getContainerInfo().getContainerID(),
+ container2.getContainerInfo().getContainerID());
+ Assert.assertEquals(2, numContainers);
+ }
+
+ @Test
+ public void testContainerStateManagerRestart() throws IOException {
+ // Allocate 5 containers in ALLOCATED state and 5 in CREATING state
+
+ List<ContainerInfo> containers = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ ContainerWithPipeline container = scm.getClientProtocolServer()
+ .allocateContainer(
+ xceiverClientManager.getType(),
+ xceiverClientManager.getFactor(), containerOwner);
+ containers.add(container.getContainerInfo());
+ if (i >= 5) {
+ scm.getScmContainerManager().updateContainerState(container
+ .getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATE);
+ }
+ }
+
+ // New instance of ContainerStateManager should load all the containers in
+ // container store.
+ ContainerStateManager stateManager =
+ new ContainerStateManager(conf, scmContainerMapping
+ );
+ int matchCount = stateManager
+ .getMatchingContainerIDs(containerOwner,
+ xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+ HddsProtos.LifeCycleState.ALLOCATED).size();
+ Assert.assertEquals(5, matchCount);
+ matchCount = stateManager.getMatchingContainerIDs(containerOwner,
+ xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+ HddsProtos.LifeCycleState.CREATING).size();
+ Assert.assertEquals(5, matchCount);
+ }
+
+ @Test
+ public void testGetMatchingContainer() throws IOException {
+ ContainerWithPipeline container1 = scm.getClientProtocolServer().
+ allocateContainer(xceiverClientManager.getType(),
+ xceiverClientManager.getFactor(), containerOwner);
+ scmContainerMapping
+ .updateContainerState(container1.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATE);
+ scmContainerMapping
+ .updateContainerState(container1.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATED);
+
+ ContainerWithPipeline container2 = scm.getClientProtocolServer().
+ allocateContainer(xceiverClientManager.getType(),
+ xceiverClientManager.getFactor(), containerOwner);
+
+ ContainerInfo info = containerStateManager
+ .getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
+ xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+ HddsProtos.LifeCycleState.OPEN);
+ Assert.assertEquals(container1.getContainerInfo().getContainerID(),
+ info.getContainerID());
+
+ info = containerStateManager
+ .getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
+ xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+ HddsProtos.LifeCycleState.ALLOCATED);
+ Assert.assertEquals(container2.getContainerInfo().getContainerID(),
+ info.getContainerID());
+
+ scmContainerMapping
+ .updateContainerState(container2.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATE);
+ scmContainerMapping
+ .updateContainerState(container2.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATED);
+
+ // space has already been allocated in container1, now container 2 should
+ // be chosen.
+ info = containerStateManager
+ .getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
+ xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+ HddsProtos.LifeCycleState.OPEN);
+ Assert.assertEquals(container2.getContainerInfo().getContainerID(),
+ info.getContainerID());
+ }
+
+ @Test
+ public void testUpdateContainerState() throws IOException {
+ NavigableSet<ContainerID> containerList = containerStateManager
+ .getMatchingContainerIDs(containerOwner,
+ xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+ HddsProtos.LifeCycleState.ALLOCATED);
+ int containers = containerList == null ? 0 : containerList.size();
+ Assert.assertEquals(0, containers);
+
+ // Allocate container1 and update its state from ALLOCATED -> CREATING ->
+ // OPEN -> CLOSING -> CLOSED -> DELETING -> DELETED
+ ContainerWithPipeline container1 = scm.getClientProtocolServer()
+ .allocateContainer(
+ xceiverClientManager.getType(),
+ xceiverClientManager.getFactor(), containerOwner);
+ containers = containerStateManager.getMatchingContainerIDs(containerOwner,
+ xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+ HddsProtos.LifeCycleState.ALLOCATED).size();
+ Assert.assertEquals(1, containers);
+
+ scmContainerMapping
+ .updateContainerState(container1.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATE);
+ containers = containerStateManager.getMatchingContainerIDs(containerOwner,
+ xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+ HddsProtos.LifeCycleState.CREATING).size();
+ Assert.assertEquals(1, containers);
+
+ scmContainerMapping
+ .updateContainerState(container1.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATED);
+ containers = containerStateManager.getMatchingContainerIDs(containerOwner,
+ xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+ HddsProtos.LifeCycleState.OPEN).size();
+ Assert.assertEquals(1, containers);
+
+ scmContainerMapping
+ .updateContainerState(container1.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.FINALIZE);
+ containers = containerStateManager.getMatchingContainerIDs(containerOwner,
+ xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+ HddsProtos.LifeCycleState.CLOSING).size();
+ Assert.assertEquals(1, containers);
+
+ scmContainerMapping
+ .updateContainerState(container1.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CLOSE);
+ containers = containerStateManager.getMatchingContainerIDs(containerOwner,
+ xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+ HddsProtos.LifeCycleState.CLOSED).size();
+ Assert.assertEquals(1, containers);
+
+ scmContainerMapping
+ .updateContainerState(container1.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.DELETE);
+ containers = containerStateManager.getMatchingContainerIDs(containerOwner,
+ xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+ HddsProtos.LifeCycleState.DELETING).size();
+ Assert.assertEquals(1, containers);
+
+ scmContainerMapping
+ .updateContainerState(container1.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CLEANUP);
+ containers = containerStateManager.getMatchingContainerIDs(containerOwner,
+ xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+ HddsProtos.LifeCycleState.DELETED).size();
+ Assert.assertEquals(1, containers);
+
+ // Allocate container1 and update its state from ALLOCATED -> CREATING ->
+ // DELETING
+ ContainerWithPipeline container2 = scm.getClientProtocolServer()
+ .allocateContainer(
+ xceiverClientManager.getType(),
+ xceiverClientManager.getFactor(), containerOwner);
+ scmContainerMapping
+ .updateContainerState(container2.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATE);
+ scmContainerMapping
+ .updateContainerState(container2.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.TIMEOUT);
+ containers = containerStateManager.getMatchingContainerIDs(containerOwner,
+ xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+ HddsProtos.LifeCycleState.DELETING).size();
+ Assert.assertEquals(1, containers);
+
+ // Allocate container1 and update its state from ALLOCATED -> CREATING ->
+ // OPEN -> CLOSING -> CLOSED
+ ContainerWithPipeline container3 = scm.getClientProtocolServer()
+ .allocateContainer(
+ xceiverClientManager.getType(),
+ xceiverClientManager.getFactor(), containerOwner);
+ scmContainerMapping
+ .updateContainerState(container3.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATE);
+ scmContainerMapping
+ .updateContainerState(container3.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATED);
+ scmContainerMapping
+ .updateContainerState(container3.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.FINALIZE);
+ scmContainerMapping
+ .updateContainerState(container3.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CLOSE);
+ containers = containerStateManager.getMatchingContainerIDs(containerOwner,
+ xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+ HddsProtos.LifeCycleState.CLOSED).size();
+ Assert.assertEquals(1, containers);
+ }
+
+ @Test
+ public void testUpdatingAllocatedBytes() throws Exception {
+ ContainerWithPipeline container1 = scm.getClientProtocolServer()
+ .allocateContainer(xceiverClientManager.getType(),
+ xceiverClientManager.getFactor(), containerOwner);
+ scmContainerMapping.updateContainerState(container1
+ .getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATE);
+ scmContainerMapping.updateContainerState(container1
+ .getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATED);
+
+ Random ran = new Random();
+ long allocatedSize = 0;
+ for (int i = 0; i<5; i++) {
+ long size = Math.abs(ran.nextLong() % OzoneConsts.GB);
+ allocatedSize += size;
+ // trigger allocating bytes by calling getMatchingContainer
+ ContainerInfo info = containerStateManager
+ .getMatchingContainer(size, containerOwner,
+ xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+ HddsProtos.LifeCycleState.OPEN);
+ Assert.assertEquals(container1.getContainerInfo().getContainerID(),
+ info.getContainerID());
+
+ ContainerMapping containerMapping =
+ (ContainerMapping) scmContainerMapping;
+ // manually trigger a flush, this will persist the allocated bytes value
+ // to disk
+ containerMapping.flushContainerInfo();
+
+ // the persisted value should always be equal to allocated size.
+ byte[] containerBytes = containerMapping.getContainerStore().get(
+ Longs.toByteArray(container1.getContainerInfo().getContainerID()));
+ HddsProtos.SCMContainerInfo infoProto =
+ HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
+ ContainerInfo currentInfo = ContainerInfo.fromProtobuf(infoProto);
+ Assert.assertEquals(allocatedSize, currentInfo.getAllocatedBytes());
+ }
+ }
+
+ @Test
+ public void testReplicaMap() throws Exception {
+ GenericTestUtils.setLogLevel(ContainerStateMap.getLOG(), Level.DEBUG);
+ GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+ .captureLogs(ContainerStateMap.getLOG());
+ DatanodeDetails dn1 = DatanodeDetails.newBuilder().setHostName("host1")
+ .setIpAddress("1.1.1.1")
+ .setUuid(UUID.randomUUID().toString()).build();
+ DatanodeDetails dn2 = DatanodeDetails.newBuilder().setHostName("host2")
+ .setIpAddress("2.2.2.2")
+ .setUuid(UUID.randomUUID().toString()).build();
+
+ // Test 1: no replica's exist
+ ContainerID containerID = ContainerID.valueof(RandomUtils.nextLong());
+ Set<DatanodeDetails> replicaSet;
+ LambdaTestUtils.intercept(SCMException.class, "", () -> {
+ containerStateManager.getContainerReplicas(containerID);
+ });
+
+ // Test 2: Add replica nodes and then test
+ containerStateManager.addContainerReplica(containerID, dn1);
+ containerStateManager.addContainerReplica(containerID, dn2);
+ replicaSet = containerStateManager.getContainerReplicas(containerID);
+ Assert.assertEquals(2, replicaSet.size());
+ Assert.assertTrue(replicaSet.contains(dn1));
+ Assert.assertTrue(replicaSet.contains(dn2));
+
+ // Test 3: Remove one replica node and then test
+ containerStateManager.removeContainerReplica(containerID, dn1);
+ replicaSet = containerStateManager.getContainerReplicas(containerID);
+ Assert.assertEquals(1, replicaSet.size());
+ Assert.assertFalse(replicaSet.contains(dn1));
+ Assert.assertTrue(replicaSet.contains(dn2));
+
+ // Test 3: Remove second replica node and then test
+ containerStateManager.removeContainerReplica(containerID, dn2);
+ replicaSet = containerStateManager.getContainerReplicas(containerID);
+ Assert.assertEquals(0, replicaSet.size());
+ Assert.assertFalse(replicaSet.contains(dn1));
+ Assert.assertFalse(replicaSet.contains(dn2));
+
+ // Test 4: Re-insert dn1
+ containerStateManager.addContainerReplica(containerID, dn1);
+ replicaSet = containerStateManager.getContainerReplicas(containerID);
+ Assert.assertEquals(1, replicaSet.size());
+ Assert.assertTrue(replicaSet.contains(dn1));
+ Assert.assertFalse(replicaSet.contains(dn2));
+
+ // Re-insert dn2
+ containerStateManager.addContainerReplica(containerID, dn2);
+ replicaSet = containerStateManager.getContainerReplicas(containerID);
+ Assert.assertEquals(2, replicaSet.size());
+ Assert.assertTrue(replicaSet.contains(dn1));
+ Assert.assertTrue(replicaSet.contains(dn2));
+
+ Assert.assertFalse(logCapturer.getOutput().contains(
+ "ReplicaMap already contains entry for container Id: " + containerID
+ .toString() + ",DataNode: " + dn1.toString()));
+ // Re-insert dn1
+ containerStateManager.addContainerReplica(containerID, dn1);
+ replicaSet = containerStateManager.getContainerReplicas(containerID);
+ Assert.assertEquals(2, replicaSet.size());
+ Assert.assertTrue(replicaSet.contains(dn1));
+ Assert.assertTrue(replicaSet.contains(dn2));
+ Assert.assertTrue(logCapturer.getOutput().contains(
+ "ReplicaMap already contains entry for container Id: " + containerID
+ .toString() + ",DataNode: " + dn1.toString()));
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org