You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/08/10 23:32:17 UTC
[17/25] hadoop git commit: HDDS-245. Handle ContainerReports in the
SCM. Contributed by Elek Marton.
HDDS-245. Handle ContainerReports in the SCM. Contributed by Elek Marton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f5dbbfe2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f5dbbfe2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f5dbbfe2
Branch: refs/heads/HDFS-12943
Commit: f5dbbfe2e97a8c11e3df0f95ae4a493f11fdbc28
Parents: b2517dd
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Thu Aug 9 16:55:13 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Thu Aug 9 16:55:39 2018 -0700
----------------------------------------------------------------------
.../hadoop/hdds/server/events/EventQueue.java | 7 +-
.../scm/container/ContainerReportHandler.java | 107 +++++-
.../replication/ReplicationActivityStatus.java | 86 +++++
.../ReplicationActivityStatusMXBean.java | 28 ++
.../replication/ReplicationRequest.java | 28 +-
.../hadoop/hdds/scm/events/SCMEvents.java | 9 +
.../hdds/scm/node/states/Node2ContainerMap.java | 10 +-
.../hdds/scm/node/states/ReportResult.java | 18 +-
.../scm/server/StorageContainerManager.java | 27 +-
.../container/TestContainerReportHandler.java | 228 +++++++++++++
.../scm/node/states/Node2ContainerMapTest.java | 308 -----------------
.../scm/node/states/TestNode2ContainerMap.java | 328 +++++++++++++++++++
12 files changed, 859 insertions(+), 325 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
index f93c54b..b2b0df2 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
@@ -147,7 +147,12 @@ public class EventQueue implements EventPublisher, AutoCloseable {
for (EventHandler handler : executorAndHandlers.getValue()) {
queuedCount.incrementAndGet();
-
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Delivering event {} to executor/handler {}: {}",
+ event.getName(),
+ executorAndHandlers.getKey().getName(),
+ payload);
+ }
executorAndHandlers.getKey()
.onMessage(handler, payload, this);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
index 486162e..b26eed2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
@@ -18,30 +18,131 @@
package org.apache.hadoop.hdds.scm.container;
+import java.io.IOException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.replication
+ .ReplicationActivityStatus;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
+import org.apache.hadoop.hdds.scm.node.states.ReportResult;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Handles container reports from datanode.
*/
public class ContainerReportHandler implements
EventHandler<ContainerReportFromDatanode> {
- private final Mapping containerMapping;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ContainerReportHandler.class);
+
private final Node2ContainerMap node2ContainerMap;
+ private final Mapping containerMapping;
+
+ private ContainerStateManager containerStateManager;
+
+ private ReplicationActivityStatus replicationStatus;
+
+
public ContainerReportHandler(Mapping containerMapping,
- Node2ContainerMap node2ContainerMap) {
+ Node2ContainerMap node2ContainerMap,
+ ReplicationActivityStatus replicationActivityStatus) {
+ Preconditions.checkNotNull(containerMapping);
+ Preconditions.checkNotNull(node2ContainerMap);
+ Preconditions.checkNotNull(replicationActivityStatus);
this.containerMapping = containerMapping;
this.node2ContainerMap = node2ContainerMap;
+ this.containerStateManager = containerMapping.getStateManager();
+ this.replicationStatus = replicationActivityStatus;
}
@Override
public void onMessage(ContainerReportFromDatanode containerReportFromDatanode,
EventPublisher publisher) {
- // TODO: process container report.
+
+ DatanodeDetails datanodeOrigin =
+ containerReportFromDatanode.getDatanodeDetails();
+
+ ContainerReportsProto containerReport =
+ containerReportFromDatanode.getReport();
+ try {
+
+ //update state in container db and trigger close container events
+ containerMapping.processContainerReports(datanodeOrigin, containerReport);
+
+ Set<ContainerID> containerIds = containerReport.getReportsList().stream()
+ .map(containerProto -> containerProto.getContainerID())
+ .map(ContainerID::new)
+ .collect(Collectors.toSet());
+
+ ReportResult reportResult = node2ContainerMap
+ .processReport(datanodeOrigin.getUuid(), containerIds);
+
+ //we have the report, so we can update the states for the next iteration.
+ node2ContainerMap
+ .setContainersForDatanode(datanodeOrigin.getUuid(), containerIds);
+
+ for (ContainerID containerID : reportResult.getMissingContainers()) {
+ containerStateManager
+ .removeContainerReplica(containerID, datanodeOrigin);
+ emitReplicationRequestEvent(containerID, publisher);
+ }
+
+ for (ContainerID containerID : reportResult.getNewContainers()) {
+ containerStateManager.addContainerReplica(containerID, datanodeOrigin);
+
+ emitReplicationRequestEvent(containerID, publisher);
+ }
+
+ } catch (IOException e) {
+ //TODO: stop all the replication?
+ LOG.error("Error on processing container report from datanode {}",
+ datanodeOrigin, e);
+ }
+
+ }
+
+ private void emitReplicationRequestEvent(ContainerID containerID,
+ EventPublisher publisher) throws SCMException {
+ ContainerInfo container = containerStateManager.getContainer(containerID);
+
+ if (container == null) {
+ //warning unknown container
+ LOG.warn(
+ "Container is missing from containerStateManager. Can't request "
+ + "replication. {}",
+ containerID);
+ }
+ if (replicationStatus.isReplicationEnabled()) {
+
+ int existingReplicas =
+ containerStateManager.getContainerReplicas(containerID).size();
+
+ int expectedReplicas = container.getReplicationFactor().getNumber();
+
+ if (existingReplicas != expectedReplicas) {
+
+ publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+ new ReplicationRequest(containerID.getId(), existingReplicas,
+ container.getReplicationFactor().getNumber()));
+ }
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java
new file mode 100644
index 0000000..4a9888c
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import javax.management.ObjectName;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.metrics2.util.MBeans;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Event listener to track the current state of replication.
+ */
+public class ReplicationActivityStatus
+ implements EventHandler<Boolean>, ReplicationActivityStatusMXBean,
+ Closeable {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ReplicationActivityStatus.class);
+
+ private AtomicBoolean replicationEnabled = new AtomicBoolean();
+
+ private ObjectName jmxObjectName;
+
+ public boolean isReplicationEnabled() {
+ return replicationEnabled.get();
+ }
+
+ @VisibleForTesting
+ public void setReplicationEnabled(boolean enabled) {
+ replicationEnabled.set(enabled);
+ }
+
+ @VisibleForTesting
+ public void enableReplication() {
+ replicationEnabled.set(true);
+ }
+
+ /**
+ * The replication status could be set by async events.
+ */
+ @Override
+ public void onMessage(Boolean enabled, EventPublisher publisher) {
+ replicationEnabled.set(enabled);
+ }
+
+ public void start() {
+ try {
+ this.jmxObjectName =
+ MBeans.register(
+ "StorageContainerManager", "ReplicationActivityStatus", this);
+ } catch (Exception ex) {
+ LOG.error("JMX bean for ReplicationActivityStatus can't be registered",
+ ex);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.jmxObjectName != null) {
+ MBeans.unregister(jmxObjectName);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java
new file mode 100644
index 0000000..164bd24
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+/**
+ * JMX interface to monitor replication status.
+ */
+public interface ReplicationActivityStatusMXBean {
+
+ boolean isReplicationEnabled();
+
+ void setReplicationEnabled(boolean enabled);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java
index ef7c546..d40cd9c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java
@@ -29,18 +29,24 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
public class ReplicationRequest implements Comparable<ReplicationRequest>,
Serializable {
private final long containerId;
- private final short replicationCount;
- private final short expecReplicationCount;
+ private final int replicationCount;
+ private final int expecReplicationCount;
private final long timestamp;
- public ReplicationRequest(long containerId, short replicationCount,
- long timestamp, short expecReplicationCount) {
+ public ReplicationRequest(long containerId, int replicationCount,
+ long timestamp, int expecReplicationCount) {
this.containerId = containerId;
this.replicationCount = replicationCount;
this.timestamp = timestamp;
this.expecReplicationCount = expecReplicationCount;
}
+ public ReplicationRequest(long containerId, int replicationCount,
+ int expecReplicationCount) {
+ this(containerId, replicationCount, System.currentTimeMillis(),
+ expecReplicationCount);
+ }
+
/**
* Compares this object with the specified object for order. Returns a
* negative integer, zero, or a positive integer as this object is less
@@ -93,7 +99,7 @@ public class ReplicationRequest implements Comparable<ReplicationRequest>,
return containerId;
}
- public short getReplicationCount() {
+ public int getReplicationCount() {
return replicationCount;
}
@@ -101,7 +107,17 @@ public class ReplicationRequest implements Comparable<ReplicationRequest>,
return timestamp;
}
- public short getExpecReplicationCount() {
+ public int getExpecReplicationCount() {
return expecReplicationCount;
}
+
+ @Override
+ public String toString() {
+ return "ReplicationRequest{" +
+ "containerId=" + containerId +
+ ", replicationCount=" + replicationCount +
+ ", expecReplicationCount=" + expecReplicationCount +
+ ", timestamp=" + timestamp +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index d49dd4f..70b1e96 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -174,6 +174,15 @@ public final class SCMEvents {
new TypedEvent<>(ReplicationCompleted.class);
/**
+ * Signal for all the components (but especially for the replication
+ * manager and container report handler) that the replication could be
+ * started. Should be send only if (almost) all the container state are
+ * available from the datanodes.
+ */
+ public static final TypedEvent<Boolean> START_REPLICATION =
+ new TypedEvent<>(Boolean.class);
+
+ /**
* Private Ctor. Never Constructed.
*/
private SCMEvents() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
index 1960604..8ed6d59 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@@ -68,7 +69,8 @@ public class Node2ContainerMap {
throws SCMException {
Preconditions.checkNotNull(containerIDs);
Preconditions.checkNotNull(datanodeID);
- if(dn2ContainerMap.putIfAbsent(datanodeID, containerIDs) != null) {
+ if (dn2ContainerMap.putIfAbsent(datanodeID, new HashSet<>(containerIDs))
+ != null) {
throw new SCMException("Node already exists in the map",
DUPLICATE_DATANODE);
}
@@ -82,11 +84,13 @@ public class Node2ContainerMap {
* @throws SCMException - if we don't know about this datanode, for new DN
* use insertNewDatanode.
*/
- public void updateDatanodeMap(UUID datanodeID, Set<ContainerID> containers)
+ public void setContainersForDatanode(UUID datanodeID, Set<ContainerID> containers)
throws SCMException {
Preconditions.checkNotNull(datanodeID);
Preconditions.checkNotNull(containers);
- if(dn2ContainerMap.computeIfPresent(datanodeID, (k, v) -> v) == null){
+ if (dn2ContainerMap
+ .computeIfPresent(datanodeID, (k, v) -> new HashSet<>(containers))
+ == null) {
throw new SCMException("No such datanode", NO_SUCH_DATANODE);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
index cb06cb3..2697629 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
@@ -21,10 +21,13 @@ package org.apache.hadoop.hdds.scm.node.states;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import java.util.Collections;
import java.util.Set;
+import com.google.common.base.Preconditions;
+
/**
- * A Container Report gets processsed by the Node2Container and returns the
+ * A Container Report gets processsed by the Node2Container and returns
* Report Result class.
*/
public class ReportResult {
@@ -36,6 +39,8 @@ public class ReportResult {
Set<ContainerID> missingContainers,
Set<ContainerID> newContainers) {
this.status = status;
+ Preconditions.checkNotNull(missingContainers);
+ Preconditions.checkNotNull(newContainers);
this.missingContainers = missingContainers;
this.newContainers = newContainers;
}
@@ -80,7 +85,16 @@ public class ReportResult {
}
ReportResult build() {
- return new ReportResult(status, missingContainers, newContainers);
+
+ Set<ContainerID> nullSafeMissingContainers = this.missingContainers;
+ Set<ContainerID> nullSafeNewContainers = this.newContainers;
+ if (nullSafeNewContainers == null) {
+ nullSafeNewContainers = Collections.emptySet();
+ }
+ if (nullSafeMissingContainers == null) {
+ nullSafeMissingContainers = Collections.emptySet();
+ }
+ return new ReportResult(status, nullSafeMissingContainers, nullSafeNewContainers);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 9cb1318..47a9100 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.replication
+ .ReplicationActivityStatus;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
@@ -164,9 +166,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
* Key = DatanodeUuid, value = ContainerStat.
*/
private Cache<String, ContainerStat> containerReportCache;
+
private final ReplicationManager replicationManager;
+
private final LeaseManager<Long> commandWatcherLeaseManager;
+ private final ReplicationActivityStatus replicationStatus;
+
/**
* Creates a new StorageContainerManager. Configuration will be updated
* with information on the
@@ -199,19 +205,26 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
+ replicationStatus = new ReplicationActivityStatus();
+
CloseContainerEventHandler closeContainerHandler =
new CloseContainerEventHandler(scmContainerManager);
NodeReportHandler nodeReportHandler =
new NodeReportHandler(scmNodeManager);
- ContainerReportHandler containerReportHandler =
- new ContainerReportHandler(scmContainerManager, node2ContainerMap);
+
CommandStatusReportHandler cmdStatusReportHandler =
new CommandStatusReportHandler();
+
NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap);
StaleNodeHandler staleNodeHandler = new StaleNodeHandler(node2ContainerMap);
DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap);
ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
+ ContainerReportHandler containerReportHandler =
+ new ContainerReportHandler(scmContainerManager, node2ContainerMap,
+ replicationStatus);
+
+
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler);
@@ -221,6 +234,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler);
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler);
+ eventQueue.addHandler(SCMEvents.START_REPLICATION, replicationStatus);
long watcherTimeout =
conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
@@ -580,6 +594,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
"server", getDatanodeProtocolServer().getDatanodeRpcAddress()));
getDatanodeProtocolServer().start();
+ replicationStatus.start();
httpServer.start();
scmBlockManager.start();
replicationManager.start();
@@ -592,6 +607,14 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
public void stop() {
try {
+ LOG.info("Stopping Replication Activity Status tracker.");
+ replicationStatus.close();
+ } catch (Exception ex) {
+ LOG.error("Replication Activity Status tracker stop failed.", ex);
+ }
+
+
+ try {
LOG.info("Stopping Replication Manager Service.");
replicationManager.stop();
} catch (Exception ex) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
new file mode 100644
index 0000000..363db99
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo
+ .Builder;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.replication
+ .ReplicationActivityStatus;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
+import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+ .ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.Event;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Matchers.anyLong;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.when;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the behaviour of the ContainerReportHandler.
+ */
+public class TestContainerReportHandler implements EventPublisher {
+
+ private List<Object> publishedEvents = new ArrayList<>();
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestContainerReportHandler.class);
+
+ @Before
+ public void resetEventCollector() {
+ publishedEvents.clear();
+ }
+
+ @Test
+ public void test() throws IOException {
+
+ //given
+
+ OzoneConfiguration conf = new OzoneConfiguration();
+ Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
+ Mapping mapping = Mockito.mock(Mapping.class);
+
+ when(mapping.getContainer(anyLong()))
+ .thenAnswer(
+ (Answer<ContainerInfo>) invocation ->
+ new Builder()
+ .setReplicationFactor(ReplicationFactor.THREE)
+ .setContainerID((Long) invocation.getArguments()[0])
+ .build()
+ );
+
+ ContainerStateManager containerStateManager =
+ new ContainerStateManager(conf, mapping);
+
+ when(mapping.getStateManager()).thenReturn(containerStateManager);
+
+ ReplicationActivityStatus replicationActivityStatus =
+ new ReplicationActivityStatus();
+
+ ContainerReportHandler reportHandler =
+ new ContainerReportHandler(mapping, node2ContainerMap,
+ replicationActivityStatus);
+
+ DatanodeDetails dn1 = TestUtils.randomDatanodeDetails();
+ DatanodeDetails dn2 = TestUtils.randomDatanodeDetails();
+ DatanodeDetails dn3 = TestUtils.randomDatanodeDetails();
+ DatanodeDetails dn4 = TestUtils.randomDatanodeDetails();
+ node2ContainerMap.insertNewDatanode(dn1.getUuid(), new HashSet<>());
+ node2ContainerMap.insertNewDatanode(dn2.getUuid(), new HashSet<>());
+ node2ContainerMap.insertNewDatanode(dn3.getUuid(), new HashSet<>());
+ node2ContainerMap.insertNewDatanode(dn4.getUuid(), new HashSet<>());
+ PipelineSelector pipelineSelector = Mockito.mock(PipelineSelector.class);
+
+ Pipeline pipeline = new Pipeline("leader", LifeCycleState.CLOSED,
+ ReplicationType.STAND_ALONE, ReplicationFactor.THREE, "pipeline1");
+
+ when(pipelineSelector.getReplicationPipeline(ReplicationType.STAND_ALONE,
+ ReplicationFactor.THREE)).thenReturn(pipeline);
+
+ long c1 = containerStateManager
+ .allocateContainer(pipelineSelector, ReplicationType.STAND_ALONE,
+ ReplicationFactor.THREE, "root").getContainerInfo()
+ .getContainerID();
+
+ long c2 = containerStateManager
+ .allocateContainer(pipelineSelector, ReplicationType.STAND_ALONE,
+ ReplicationFactor.THREE, "root").getContainerInfo()
+ .getContainerID();
+
+ //when
+
+ //initial reports before replication is enabled. 2 containers w 3 replicas.
+ reportHandler.onMessage(
+ new ContainerReportFromDatanode(dn1,
+ createContainerReport(new long[] {c1, c2})), this);
+
+ reportHandler.onMessage(
+ new ContainerReportFromDatanode(dn2,
+ createContainerReport(new long[] {c1, c2})), this);
+
+ reportHandler.onMessage(
+ new ContainerReportFromDatanode(dn3,
+ createContainerReport(new long[] {c1, c2})), this);
+
+ reportHandler.onMessage(
+ new ContainerReportFromDatanode(dn4,
+ createContainerReport(new long[] {})), this);
+
+ Assert.assertEquals(0, publishedEvents.size());
+
+ replicationActivityStatus.enableReplication();
+
+ //no problem here
+ reportHandler.onMessage(
+ new ContainerReportFromDatanode(dn1,
+ createContainerReport(new long[] {c1, c2})), this);
+
+ Assert.assertEquals(0, publishedEvents.size());
+
+ //container is missing from d2
+ reportHandler.onMessage(
+ new ContainerReportFromDatanode(dn2,
+ createContainerReport(new long[] {c1})), this);
+
+ Assert.assertEquals(1, publishedEvents.size());
+ ReplicationRequest replicationRequest =
+ (ReplicationRequest) publishedEvents.get(0);
+
+ Assert.assertEquals(c2, replicationRequest.getContainerId());
+ Assert.assertEquals(3, replicationRequest.getExpecReplicationCount());
+ Assert.assertEquals(2, replicationRequest.getReplicationCount());
+
+ //container was replicated to dn4
+ reportHandler.onMessage(
+ new ContainerReportFromDatanode(dn4,
+ createContainerReport(new long[] {c2})), this);
+
+ //no more event, everything is perfect
+ Assert.assertEquals(1, publishedEvents.size());
+
+ //c2 was found at dn2 (it was missing before, magic)
+ reportHandler.onMessage(
+ new ContainerReportFromDatanode(dn2,
+ createContainerReport(new long[] {c1, c2})), this);
+
+ //c2 is over replicated (dn1,dn2,dn3,dn4)
+ Assert.assertEquals(2, publishedEvents.size());
+
+ replicationRequest =
+ (ReplicationRequest) publishedEvents.get(1);
+
+ Assert.assertEquals(c2, replicationRequest.getContainerId());
+ Assert.assertEquals(3, replicationRequest.getExpecReplicationCount());
+ Assert.assertEquals(4, replicationRequest.getReplicationCount());
+
+ }
+
+ private ContainerReportsProto createContainerReport(long[] containerIds) {
+
+ ContainerReportsProto.Builder crBuilder =
+ ContainerReportsProto.newBuilder();
+
+ for (long containerId : containerIds) {
+ org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder
+ ciBuilder = org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
+ ciBuilder.setFinalhash("e16cc9d6024365750ed8dbd194ea46d2")
+ .setSize(5368709120L)
+ .setUsed(2000000000L)
+ .setKeyCount(100000000L)
+ .setReadCount(100000000L)
+ .setWriteCount(100000000L)
+ .setReadBytes(2000000000L)
+ .setWriteBytes(2000000000L)
+ .setContainerID(containerId)
+ .setDeleteTransactionId(0);
+
+ crBuilder.addReports(ciBuilder.build());
+ }
+
+ return crBuilder.build();
+ }
+
+ @Override
+ public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
+ EVENT_TYPE event, PAYLOAD payload) {
+ LOG.info("Event is published: {}", payload);
+ publishedEvents.add(payload);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMapTest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMapTest.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMapTest.java
deleted file mode 100644
index 79f1b40..0000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMapTest.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.hdds.scm.node.states;
-
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Test classes for Node2ContainerMap.
- */
-public class Node2ContainerMapTest {
- private final static int DATANODE_COUNT = 300;
- private final static int CONTAINER_COUNT = 1000;
- private final Map<UUID, TreeSet<ContainerID>> testData = new
- ConcurrentHashMap<>();
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- private void generateData() {
- for (int dnIndex = 1; dnIndex <= DATANODE_COUNT; dnIndex++) {
- TreeSet<ContainerID> currentSet = new TreeSet<>();
- for (int cnIndex = 1; cnIndex <= CONTAINER_COUNT; cnIndex++) {
- long currentCnIndex = (dnIndex * CONTAINER_COUNT) + cnIndex;
- currentSet.add(new ContainerID(currentCnIndex));
- }
- testData.put(UUID.randomUUID(), currentSet);
- }
- }
-
- private UUID getFirstKey() {
- return testData.keySet().iterator().next();
- }
-
- @Before
- public void setUp() throws Exception {
- generateData();
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
- @Test
- public void testIsKnownDatanode() throws SCMException {
- Node2ContainerMap map = new Node2ContainerMap();
- UUID knownNode = getFirstKey();
- UUID unknownNode = UUID.randomUUID();
- Set<ContainerID> containerIDs = testData.get(knownNode);
- map.insertNewDatanode(knownNode, containerIDs);
- Assert.assertTrue("Not able to detect a known node",
- map.isKnownDatanode(knownNode));
- Assert.assertFalse("Unknown node detected",
- map.isKnownDatanode(unknownNode));
- }
-
- @Test
- public void testInsertNewDatanode() throws SCMException {
- Node2ContainerMap map = new Node2ContainerMap();
- UUID knownNode = getFirstKey();
- Set<ContainerID> containerIDs = testData.get(knownNode);
- map.insertNewDatanode(knownNode, containerIDs);
- Set<ContainerID> readSet = map.getContainers(knownNode);
-
- // Assert that all elements are present in the set that we read back from
- // node map.
- Set newSet = new TreeSet((readSet));
- Assert.assertTrue(newSet.removeAll(containerIDs));
- Assert.assertTrue(newSet.size() == 0);
-
- thrown.expect(SCMException.class);
- thrown.expectMessage("already exists");
- map.insertNewDatanode(knownNode, containerIDs);
-
- map.removeDatanode(knownNode);
- map.insertNewDatanode(knownNode, containerIDs);
-
- }
-
- @Test
- public void testProcessReportCheckOneNode() throws SCMException {
- UUID key = getFirstKey();
- Set<ContainerID> values = testData.get(key);
- Node2ContainerMap map = new Node2ContainerMap();
- map.insertNewDatanode(key, values);
- Assert.assertTrue(map.isKnownDatanode(key));
- ReportResult result = map.processReport(key, values);
- Assert.assertEquals(result.getStatus(),
- Node2ContainerMap.ReportStatus.ALL_IS_WELL);
- }
-
- @Test
- public void testProcessReportInsertAll() throws SCMException {
- Node2ContainerMap map = new Node2ContainerMap();
-
- for (Map.Entry<UUID, TreeSet<ContainerID>> keyEntry : testData.entrySet()) {
- map.insertNewDatanode(keyEntry.getKey(), keyEntry.getValue());
- }
- // Assert all Keys are known datanodes.
- for (UUID key : testData.keySet()) {
- Assert.assertTrue(map.isKnownDatanode(key));
- }
- }
-
- /*
- For ProcessReport we have to test the following scenarios.
-
- 1. New Datanode - A new datanode appears and we have to add that to the
- SCM's Node2Container Map.
-
- 2. New Container - A Datanode exists, but a new container is added to that
- DN. We need to detect that and return a list of added containers.
-
- 3. Missing Container - A Datanode exists, but one of the expected container
- on that datanode is missing. We need to detect that.
-
- 4. We get a container report that has both the missing and new containers.
- We need to return separate lists for these.
- */
-
- /**
- * Assert that we are able to detect the addition of a new datanode.
- *
- * @throws SCMException
- */
- @Test
- public void testProcessReportDetectNewDataNode() throws SCMException {
- Node2ContainerMap map = new Node2ContainerMap();
- // If we attempt to process a node that is not present in the map,
- // we get a result back that says, NEW_NODE_FOUND.
- UUID key = getFirstKey();
- TreeSet<ContainerID> values = testData.get(key);
- ReportResult result = map.processReport(key, values);
- Assert.assertEquals(Node2ContainerMap.ReportStatus.NEW_DATANODE_FOUND,
- result.getStatus());
- Assert.assertEquals(result.getNewContainers().size(), values.size());
- }
-
- /**
- * This test asserts that processReport is able to detect new containers
- * when it is added to a datanode. For that we populate the DN with a list
- * of containerIDs and then add few more containers and make sure that we
- * are able to detect them.
- *
- * @throws SCMException
- */
- @Test
- public void testProcessReportDetectNewContainers() throws SCMException {
- Node2ContainerMap map = new Node2ContainerMap();
- UUID key = getFirstKey();
- TreeSet<ContainerID> values = testData.get(key);
- map.insertNewDatanode(key, values);
-
- final int newCount = 100;
- // This is not a mistake, the treeset seems to be reverse sorted.
- ContainerID last = values.pollFirst();
- TreeSet<ContainerID> addedContainers = new TreeSet<>();
- for (int x = 1; x <= newCount; x++) {
- long cTemp = last.getId() + x;
- addedContainers.add(new ContainerID(cTemp));
- }
-
- // This set is the super set of existing containers and new containers.
- TreeSet<ContainerID> newContainersSet = new TreeSet<>(values);
- newContainersSet.addAll(addedContainers);
-
- ReportResult result = map.processReport(key, newContainersSet);
-
- //Assert that expected size of missing container is same as addedContainers
- Assert.assertEquals(Node2ContainerMap.ReportStatus.NEW_CONTAINERS_FOUND,
- result.getStatus());
-
- Assert.assertEquals(addedContainers.size(),
- result.getNewContainers().size());
-
- // Assert that the Container IDs are the same as we added new.
- Assert.assertTrue("All objects are not removed.",
- result.getNewContainers().removeAll(addedContainers));
- }
-
- /**
- * This test asserts that processReport is able to detect missing containers
- * if they are misssing from a list.
- *
- * @throws SCMException
- */
- @Test
- public void testProcessReportDetectMissingContainers() throws SCMException {
- Node2ContainerMap map = new Node2ContainerMap();
- UUID key = getFirstKey();
- TreeSet<ContainerID> values = testData.get(key);
- map.insertNewDatanode(key, values);
-
- final int removeCount = 100;
- Random r = new Random();
-
- ContainerID first = values.pollLast();
- TreeSet<ContainerID> removedContainers = new TreeSet<>();
-
- // Pick a random container to remove it is ok to collide no issues.
- for (int x = 0; x < removeCount; x++) {
- int startBase = (int) first.getId();
- long cTemp = r.nextInt(values.size());
- removedContainers.add(new ContainerID(cTemp + startBase));
- }
-
- // This set is a new set with some containers removed.
- TreeSet<ContainerID> newContainersSet = new TreeSet<>(values);
- newContainersSet.removeAll(removedContainers);
-
- ReportResult result = map.processReport(key, newContainersSet);
-
-
- //Assert that expected size of missing container is same as addedContainers
- Assert.assertEquals(Node2ContainerMap.ReportStatus.MISSING_CONTAINERS,
- result.getStatus());
- Assert.assertEquals(removedContainers.size(),
- result.getMissingContainers().size());
-
- // Assert that the Container IDs are the same as we added new.
- Assert.assertTrue("All missing containers not found.",
- result.getMissingContainers().removeAll(removedContainers));
- }
-
- @Test
- public void testProcessReportDetectNewAndMissingContainers() throws
- SCMException {
- Node2ContainerMap map = new Node2ContainerMap();
- UUID key = getFirstKey();
- TreeSet<ContainerID> values = testData.get(key);
- map.insertNewDatanode(key, values);
-
- Set<ContainerID> insertedSet = new TreeSet<>();
- // Insert nodes from 1..30
- for (int x = 1; x <= 30; x++) {
- insertedSet.add(new ContainerID(x));
- }
-
-
- final int removeCount = 100;
- Random r = new Random();
-
- ContainerID first = values.pollLast();
- TreeSet<ContainerID> removedContainers = new TreeSet<>();
-
- // Pick a random container to remove it is ok to collide no issues.
- for (int x = 0; x < removeCount; x++) {
- int startBase = (int) first.getId();
- long cTemp = r.nextInt(values.size());
- removedContainers.add(new ContainerID(cTemp + startBase));
- }
-
- Set<ContainerID> newSet = new TreeSet<>(values);
- newSet.addAll(insertedSet);
- newSet.removeAll(removedContainers);
-
- ReportResult result = map.processReport(key, newSet);
-
-
- Assert.assertEquals(
- Node2ContainerMap.ReportStatus.MISSING_AND_NEW_CONTAINERS_FOUND,
- result.getStatus());
- Assert.assertEquals(removedContainers.size(),
- result.getMissingContainers().size());
-
-
- // Assert that the Container IDs are the same as we added new.
- Assert.assertTrue("All missing containers not found.",
- result.getMissingContainers().removeAll(removedContainers));
-
- Assert.assertEquals(insertedSet.size(),
- result.getNewContainers().size());
-
- // Assert that the Container IDs are the same as we added new.
- Assert.assertTrue("All inserted containers are not found.",
- result.getNewContainers().removeAll(insertedSet));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java
new file mode 100644
index 0000000..633653b
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.node.states;
+
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Test classes for Node2ContainerMap.
+ */
+public class TestNode2ContainerMap {
+ private final static int DATANODE_COUNT = 300;
+ private final static int CONTAINER_COUNT = 1000;
+ private final Map<UUID, TreeSet<ContainerID>> testData = new
+ ConcurrentHashMap<>();
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ private void generateData() {
+ for (int dnIndex = 1; dnIndex <= DATANODE_COUNT; dnIndex++) {
+ TreeSet<ContainerID> currentSet = new TreeSet<>();
+ for (int cnIndex = 1; cnIndex <= CONTAINER_COUNT; cnIndex++) {
+ long currentCnIndex = (dnIndex * CONTAINER_COUNT) + cnIndex;
+ currentSet.add(new ContainerID(currentCnIndex));
+ }
+ testData.put(UUID.randomUUID(), currentSet);
+ }
+ }
+
+ private UUID getFirstKey() {
+ return testData.keySet().iterator().next();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ generateData();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testIsKnownDatanode() throws SCMException {
+ Node2ContainerMap map = new Node2ContainerMap();
+ UUID knownNode = getFirstKey();
+ UUID unknownNode = UUID.randomUUID();
+ Set<ContainerID> containerIDs = testData.get(knownNode);
+ map.insertNewDatanode(knownNode, containerIDs);
+ Assert.assertTrue("Not able to detect a known node",
+ map.isKnownDatanode(knownNode));
+ Assert.assertFalse("Unknown node detected",
+ map.isKnownDatanode(unknownNode));
+ }
+
+ @Test
+ public void testInsertNewDatanode() throws SCMException {
+ Node2ContainerMap map = new Node2ContainerMap();
+ UUID knownNode = getFirstKey();
+ Set<ContainerID> containerIDs = testData.get(knownNode);
+ map.insertNewDatanode(knownNode, containerIDs);
+ Set<ContainerID> readSet = map.getContainers(knownNode);
+
+ // Assert that all elements are present in the set that we read back from
+ // node map.
+ Set newSet = new TreeSet((readSet));
+ Assert.assertTrue(newSet.removeAll(containerIDs));
+ Assert.assertTrue(newSet.size() == 0);
+
+ thrown.expect(SCMException.class);
+ thrown.expectMessage("already exists");
+ map.insertNewDatanode(knownNode, containerIDs);
+
+ map.removeDatanode(knownNode);
+ map.insertNewDatanode(knownNode, containerIDs);
+
+ }
+
+ @Test
+ public void testProcessReportCheckOneNode() throws SCMException {
+ UUID key = getFirstKey();
+ Set<ContainerID> values = testData.get(key);
+ Node2ContainerMap map = new Node2ContainerMap();
+ map.insertNewDatanode(key, values);
+ Assert.assertTrue(map.isKnownDatanode(key));
+ ReportResult result = map.processReport(key, values);
+ Assert.assertEquals(result.getStatus(),
+ Node2ContainerMap.ReportStatus.ALL_IS_WELL);
+ }
+
+ @Test
+ public void testUpdateDatanodeMap() throws SCMException {
+ UUID datanodeId = getFirstKey();
+ Set<ContainerID> values = testData.get(datanodeId);
+ Node2ContainerMap map = new Node2ContainerMap();
+ map.insertNewDatanode(datanodeId, values);
+ Assert.assertTrue(map.isKnownDatanode(datanodeId));
+ Assert.assertEquals(CONTAINER_COUNT, map.getContainers(datanodeId).size());
+
+ //remove one container
+ values.remove(values.iterator().next());
+ Assert.assertEquals(CONTAINER_COUNT - 1, values.size());
+ Assert.assertEquals(CONTAINER_COUNT, map.getContainers(datanodeId).size());
+
+ map.setContainersForDatanode(datanodeId, values);
+
+ Assert.assertEquals(values.size(), map.getContainers(datanodeId).size());
+ Assert.assertEquals(values, map.getContainers(datanodeId));
+ }
+
+ @Test
+ public void testProcessReportInsertAll() throws SCMException {
+ Node2ContainerMap map = new Node2ContainerMap();
+
+ for (Map.Entry<UUID, TreeSet<ContainerID>> keyEntry : testData.entrySet()) {
+ map.insertNewDatanode(keyEntry.getKey(), keyEntry.getValue());
+ }
+ // Assert all Keys are known datanodes.
+ for (UUID key : testData.keySet()) {
+ Assert.assertTrue(map.isKnownDatanode(key));
+ }
+ }
+
+ /*
+ For ProcessReport we have to test the following scenarios.
+
+ 1. New Datanode - A new datanode appears and we have to add that to the
+ SCM's Node2Container Map.
+
+ 2. New Container - A Datanode exists, but a new container is added to that
+ DN. We need to detect that and return a list of added containers.
+
+ 3. Missing Container - A Datanode exists, but one of the expected container
+ on that datanode is missing. We need to detect that.
+
+ 4. We get a container report that has both the missing and new containers.
+ We need to return separate lists for these.
+ */
+
+ /**
+ * Assert that we are able to detect the addition of a new datanode.
+ *
+ * @throws SCMException
+ */
+ @Test
+ public void testProcessReportDetectNewDataNode() throws SCMException {
+ Node2ContainerMap map = new Node2ContainerMap();
+ // If we attempt to process a node that is not present in the map,
+ // we get a result back that says, NEW_NODE_FOUND.
+ UUID key = getFirstKey();
+ TreeSet<ContainerID> values = testData.get(key);
+ ReportResult result = map.processReport(key, values);
+ Assert.assertEquals(Node2ContainerMap.ReportStatus.NEW_DATANODE_FOUND,
+ result.getStatus());
+ Assert.assertEquals(result.getNewContainers().size(), values.size());
+ }
+
+ /**
+ * This test asserts that processReport is able to detect new containers
+ * when it is added to a datanode. For that we populate the DN with a list
+ * of containerIDs and then add few more containers and make sure that we
+ * are able to detect them.
+ *
+ * @throws SCMException
+ */
+ @Test
+ public void testProcessReportDetectNewContainers() throws SCMException {
+ Node2ContainerMap map = new Node2ContainerMap();
+ UUID key = getFirstKey();
+ TreeSet<ContainerID> values = testData.get(key);
+ map.insertNewDatanode(key, values);
+
+ final int newCount = 100;
+ // This is not a mistake, the treeset seems to be reverse sorted.
+ ContainerID last = values.first();
+ TreeSet<ContainerID> addedContainers = new TreeSet<>();
+ for (int x = 1; x <= newCount; x++) {
+ long cTemp = last.getId() + x;
+ addedContainers.add(new ContainerID(cTemp));
+ }
+
+ // This set is the super set of existing containers and new containers.
+ TreeSet<ContainerID> newContainersSet = new TreeSet<>(values);
+ newContainersSet.addAll(addedContainers);
+
+ ReportResult result = map.processReport(key, newContainersSet);
+
+ //Assert that expected size of missing container is same as addedContainers
+ Assert.assertEquals(Node2ContainerMap.ReportStatus.NEW_CONTAINERS_FOUND,
+ result.getStatus());
+
+ Assert.assertEquals(addedContainers.size(),
+ result.getNewContainers().size());
+
+ // Assert that the Container IDs are the same as we added new.
+ Assert.assertTrue("All objects are not removed.",
+ result.getNewContainers().removeAll(addedContainers));
+ }
+
+ /**
+ * This test asserts that processReport is able to detect missing containers
+ * if they are misssing from a list.
+ *
+ * @throws SCMException
+ */
+ @Test
+ public void testProcessReportDetectMissingContainers() throws SCMException {
+ Node2ContainerMap map = new Node2ContainerMap();
+ UUID key = getFirstKey();
+ TreeSet<ContainerID> values = testData.get(key);
+ map.insertNewDatanode(key, values);
+
+ final int removeCount = 100;
+ Random r = new Random();
+
+ ContainerID first = values.last();
+ TreeSet<ContainerID> removedContainers = new TreeSet<>();
+
+ // Pick a random container to remove it is ok to collide no issues.
+ for (int x = 0; x < removeCount; x++) {
+ int startBase = (int) first.getId();
+ long cTemp = r.nextInt(values.size());
+ removedContainers.add(new ContainerID(cTemp + startBase));
+ }
+
+ // This set is a new set with some containers removed.
+ TreeSet<ContainerID> newContainersSet = new TreeSet<>(values);
+ newContainersSet.removeAll(removedContainers);
+
+ ReportResult result = map.processReport(key, newContainersSet);
+
+
+ //Assert that expected size of missing container is same as addedContainers
+ Assert.assertEquals(Node2ContainerMap.ReportStatus.MISSING_CONTAINERS,
+ result.getStatus());
+ Assert.assertEquals(removedContainers.size(),
+ result.getMissingContainers().size());
+
+ // Assert that the Container IDs are the same as we added new.
+ Assert.assertTrue("All missing containers not found.",
+ result.getMissingContainers().removeAll(removedContainers));
+ }
+
+ @Test
+ public void testProcessReportDetectNewAndMissingContainers() throws
+ SCMException {
+ Node2ContainerMap map = new Node2ContainerMap();
+ UUID key = getFirstKey();
+ TreeSet<ContainerID> values = testData.get(key);
+ map.insertNewDatanode(key, values);
+
+ Set<ContainerID> insertedSet = new TreeSet<>();
+ // Insert nodes from 1..30
+ for (int x = 1; x <= 30; x++) {
+ insertedSet.add(new ContainerID(x));
+ }
+
+
+ final int removeCount = 100;
+ Random r = new Random();
+
+ ContainerID first = values.last();
+ TreeSet<ContainerID> removedContainers = new TreeSet<>();
+
+ // Pick a random container to remove it is ok to collide no issues.
+ for (int x = 0; x < removeCount; x++) {
+ int startBase = (int) first.getId();
+ long cTemp = r.nextInt(values.size());
+ removedContainers.add(new ContainerID(cTemp + startBase));
+ }
+
+ Set<ContainerID> newSet = new TreeSet<>(values);
+ newSet.addAll(insertedSet);
+ newSet.removeAll(removedContainers);
+
+ ReportResult result = map.processReport(key, newSet);
+
+
+ Assert.assertEquals(
+ Node2ContainerMap.ReportStatus.MISSING_AND_NEW_CONTAINERS_FOUND,
+ result.getStatus());
+ Assert.assertEquals(removedContainers.size(),
+ result.getMissingContainers().size());
+
+
+ // Assert that the Container IDs are the same as we added new.
+ Assert.assertTrue("All missing containers not found.",
+ result.getMissingContainers().removeAll(removedContainers));
+
+ Assert.assertEquals(insertedSet.size(),
+ result.getNewContainers().size());
+
+ // Assert that the Container IDs are the same as we added new.
+ Assert.assertTrue("All inserted containers are not found.",
+ result.getNewContainers().removeAll(insertedSet));
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org