You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/09/20 22:21:40 UTC
[18/50] [abbrv] hadoop git commit: HDDS-476. Add Pipeline reports to
make pipeline active on SCM restart. Contributed by Mukul Kumar Singh.
HDDS-476. Add Pipeline reports to make pipeline active on SCM restart.
Contributed by Mukul Kumar Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c0956ee2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c0956ee2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c0956ee2
Branch: refs/heads/HDDS-4
Commit: c0956ee2a879d1f82938dd2b8bab79b09ae32eac
Parents: 0712537e
Author: Nanda kumar <na...@apache.org>
Authored: Wed Sep 19 18:49:13 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Wed Sep 19 18:52:08 2018 +0530
----------------------------------------------------------------------
.../hadoop/hdds/scm/XceiverClientRatis.java | 2 +-
.../org/apache/hadoop/hdds/HddsConfigKeys.java | 5 +
.../scm/container/common/helpers/Pipeline.java | 22 ++-
.../container/common/helpers/PipelineID.java | 13 +-
.../common/src/main/resources/ozone-default.xml | 8 +
.../apache/hadoop/hdds/scm/HddsServerUtil.java | 21 +++
.../common/report/PipelineReportPublisher.java | 73 +++++++++
.../common/report/ReportPublisherFactory.java | 4 +
.../states/endpoint/RegisterEndpointTask.java | 8 +-
.../transport/server/XceiverServerGrpc.java | 16 ++
.../transport/server/XceiverServerSpi.java | 9 ++
.../server/ratis/XceiverServerRatis.java | 63 ++++----
.../container/ozoneimpl/OzoneContainer.java | 12 ++
.../StorageContainerDatanodeProtocol.java | 10 +-
.../protocol/StorageContainerNodeProtocol.java | 6 +-
...rDatanodeProtocolClientSideTranslatorPB.java | 6 +-
...rDatanodeProtocolServerSideTranslatorPB.java | 5 +-
.../StorageContainerDatanodeProtocol.proto | 10 ++
.../ozone/container/common/ScmTestMock.java | 8 +-
.../hdds/scm/container/ContainerMapping.java | 19 ---
.../scm/container/ContainerReportHandler.java | 6 +-
.../hadoop/hdds/scm/container/Mapping.java | 15 +-
.../hadoop/hdds/scm/events/SCMEvents.java | 15 +-
.../hadoop/hdds/scm/node/SCMNodeManager.java | 5 +-
.../hadoop/hdds/scm/node/StaleNodeHandler.java | 19 +--
.../hdds/scm/node/states/Node2ContainerMap.java | 123 ++------------
.../hdds/scm/node/states/Node2ObjectsMap.java | 162 +++++++++++++++++++
.../hdds/scm/node/states/ReportResult.java | 105 ++++++------
.../hdds/scm/pipelines/Node2PipelineMap.java | 45 +-----
.../scm/pipelines/PipelineCloseHandler.java | 24 ++-
.../hdds/scm/pipelines/PipelineManager.java | 52 +++---
.../scm/pipelines/PipelineReportHandler.java | 59 +++++++
.../hdds/scm/pipelines/PipelineSelector.java | 103 ++++++------
.../scm/pipelines/ratis/RatisManagerImpl.java | 41 ++---
.../standalone/StandaloneManagerImpl.java | 44 ++---
.../server/SCMDatanodeHeartbeatDispatcher.java | 23 +++
.../scm/server/SCMDatanodeProtocolServer.java | 16 +-
.../scm/server/StorageContainerManager.java | 11 +-
.../org/apache/hadoop/hdds/scm/TestUtils.java | 10 +-
.../hdds/scm/container/MockNodeManager.java | 4 +-
.../hadoop/hdds/scm/node/TestNodeManager.java | 6 +-
.../scm/node/states/TestNode2ContainerMap.java | 28 ++--
.../ozone/container/common/TestEndPoint.java | 5 +-
.../testutils/ReplicationNodeManagerMock.java | 5 +-
.../hdds/scm/pipeline/TestNode2PipelineMap.java | 2 +-
.../hdds/scm/pipeline/TestPipelineClose.java | 2 +-
.../hdds/scm/pipeline/TestSCMRestart.java | 20 ++-
.../apache/hadoop/ozone/RatisTestHelper.java | 9 ++
.../hadoop/ozone/web/client/TestKeys.java | 2 +-
.../hadoop/ozone/web/client/TestKeysRatis.java | 2 -
hadoop-project/pom.xml | 2 +-
51 files changed, 809 insertions(+), 476 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 946abfb..4c4de7f 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -110,7 +110,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
callRatisRpc(pipeline.getMachines(), (raftClient, peer) -> raftClient
- .groupRemove(group.getGroupId(), peer.getId()));
+ .groupRemove(group.getGroupId(), true, peer.getId()));
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 492be82..856d113 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -46,6 +46,11 @@ public final class HddsConfigKeys {
public static final String HDDS_CONTAINER_REPORT_INTERVAL_DEFAULT =
"60s";
+ public static final String HDDS_PIPELINE_REPORT_INTERVAL =
+ "hdds.pipeline.report.interval";
+ public static final String HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT =
+ "60s";
+
public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL =
"hdds.command.status.report.interval";
public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
index ef148e5..777efa7 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
-import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.List;
/**
@@ -83,7 +83,7 @@ public class Pipeline {
this.type = replicationType;
this.factor = replicationFactor;
this.id = id;
- datanodes = new TreeMap<>();
+ datanodes = new ConcurrentHashMap<>();
}
@Override
@@ -151,9 +151,21 @@ public class Pipeline {
return getDatanodes().get(leaderID);
}
- public void addMember(DatanodeDetails datanodeDetails) {
- datanodes.put(datanodeDetails.getUuid().toString(),
- datanodeDetails);
+ /**
+ * Adds a datanode to pipeline
+ * @param datanodeDetails datanode to be added.
+ * @return true if the dn was not earlier present, false otherwise
+ */
+ public boolean addMember(DatanodeDetails datanodeDetails) {
+ return datanodes.put(datanodeDetails.getUuid().toString(),
+ datanodeDetails) == null;
+
+ }
+
+ public void resetPipeline() {
+ // reset datanodes in pipeline and learn about them through
+ // pipeline reports on SCM restart
+ datanodes.clear();
}
public Map<String, DatanodeDetails> getDatanodes() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java
index 473ebc5..6e27a71 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java
@@ -28,7 +28,7 @@ import java.util.UUID;
* in Ratis as RaftGroupId, GroupID is used by the datanodes to initialize
* the ratis group they are part of.
*/
-public class PipelineID {
+public final class PipelineID implements Comparable<PipelineID> {
private UUID id;
private RaftGroupId groupId;
@@ -42,8 +42,12 @@ public class PipelineID {
return new PipelineID(UUID.randomUUID());
}
+ public static PipelineID valueOf(UUID id) {
+ return new PipelineID(id);
+ }
+
public static PipelineID valueOf(RaftGroupId groupId) {
- return new PipelineID(groupId.getUuid());
+ return valueOf(groupId.getUuid());
}
public RaftGroupId getRaftGroupID() {
@@ -68,6 +72,11 @@ public class PipelineID {
}
@Override
+ public int compareTo(PipelineID o) {
+ return this.id.compareTo(o.id);
+ }
+
+ @Override
public boolean equals(Object o) {
if (this == o) {
return true;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index a74124e..f7681e8 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -224,6 +224,14 @@
received from SCM to SCM. Unit could be defined with postfix
(ns,ms,s,m,h,d)</description>
</property>
+ <property>
+ <name>hdds.pipeline.report.interval</name>
+ <value>60000ms</value>
+ <tag>OZONE, PIPELINE, MANAGEMENT</tag>
+ <description>Time interval of the datanode to send pipeline report. Each
+ datanode periodically send pipeline report to SCM. Unit could be
+ defined with postfix (ns,ms,s,m,h,d)</description>
+ </property>
<!--Ozone Settings-->
<property>
<name>ozone.administrators</name>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java
index 580d027..d505be3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java
@@ -18,12 +18,15 @@
package org.apache.hadoop.hdds.scm;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
@@ -312,4 +315,22 @@ public final class HddsServerUtil {
services.put(OZONE_SCM_SERVICE_ID, serviceInstances);
return services;
}
+
+ public static String getOzoneDatanodeRatisDirectory(Configuration conf) {
+ final String ratisDir = File.separator + "ratis";
+ String storageDir = conf.get(
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR);
+
+ if (Strings.isNullOrEmpty(storageDir)) {
+ storageDir = conf.get(OzoneConfigKeys
+ .OZONE_METADATA_DIRS);
+ Preconditions.checkNotNull(storageDir, "ozone.metadata.dirs " +
+ "cannot be null, Please check your configs.");
+ storageDir = storageDir.concat(ratisDir);
+ LOG.warn("Storage directory for Ratis is not configured." +
+ "Mapping Ratis storage under {}. It is a good idea " +
+ "to map this to an SSD disk.", storageDir);
+ }
+ return storageDir;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
new file mode 100644
index 0000000..e7f4347
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.report;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.scm.HddsServerUtil;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT;
+
+
+/**
+ * Publishes Pipeline which will be sent to SCM as part of heartbeat.
+ * PipelineReport consist of the following information about each containers:
+ * - pipelineID
+ *
+ */
+public class PipelineReportPublisher extends
+ ReportPublisher<PipelineReportsProto> {
+
+ private Long pipelineReportInterval = null;
+
+ @Override
+ protected long getReportFrequency() {
+ if (pipelineReportInterval == null) {
+ pipelineReportInterval = getConf().getTimeDuration(
+ HDDS_PIPELINE_REPORT_INTERVAL,
+ HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ long heartbeatFrequency = HddsServerUtil.getScmHeartbeatInterval(
+ getConf());
+
+ Preconditions.checkState(
+ heartbeatFrequency <= pipelineReportInterval,
+ HDDS_PIPELINE_REPORT_INTERVAL +
+ " cannot be configured lower than heartbeat frequency.");
+ }
+ // Add a random delay (0~30s) on top of the pipeline report
+ // interval (60s) so tha the SCM is overwhelmed by the pipeline reports
+ // sent in sync.
+ return pipelineReportInterval + getRandomReportDelay();
+ }
+
+ private long getRandomReportDelay() {
+ return RandomUtils.nextLong(0, pipelineReportInterval);
+ }
+
+ @Override
+ protected PipelineReportsProto getReport() {
+ return getContext().getParent().getContainer().getPipelineReport();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
index ea89280..1c456a0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.container.common.report;
import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
@@ -53,6 +55,8 @@ public class ReportPublisherFactory {
ContainerReportPublisher.class);
report2publisher.put(CommandStatusReportsProto.class,
CommandStatusReportPublisher.class);
+ report2publisher.put(PipelineReportsProto.class,
+ PipelineReportPublisher.class);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
index ccab095..690aa01 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
@@ -21,6 +21,8 @@ import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine;
import org.apache.hadoop.hdds.protocol.proto
@@ -108,13 +110,15 @@ public final class RegisterEndpointTask implements
rpcEndPoint.lock();
try {
- ContainerReportsProto contianerReport = datanodeContainerManager
+ ContainerReportsProto containerReport = datanodeContainerManager
.getContainerReport();
NodeReportProto nodeReport = datanodeContainerManager.getNodeReport();
+ PipelineReportsProto pipelineReportsProto =
+ datanodeContainerManager.getPipelineReport();
// TODO : Add responses to the command Queue.
SCMRegisteredResponseProto response = rpcEndPoint.getEndPoint()
.register(datanodeDetails.getProtoBufMessage(), nodeReport,
- contianerReport);
+ containerReport, pipelineReportsProto);
Preconditions.checkState(UUID.fromString(response.getDatanodeUUID())
.equals(datanodeDetails.getUuid()),
"Unexpected datanode ID in the response.");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index 4a90144..83e742c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -24,6 +24,9 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineReport;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -38,6 +41,9 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
/**
* Creates a Grpc server endpoint that acts as the communication layer for
@@ -47,6 +53,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
private static final Logger
LOG = LoggerFactory.getLogger(XceiverServerGrpc.class);
private int port;
+ private UUID id;
private Server server;
private final ContainerDispatcher storageContainer;
@@ -59,6 +66,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
ContainerDispatcher dispatcher, BindableService... additionalServices) {
Preconditions.checkNotNull(conf);
+ this.id = datanodeDetails.getUuid();
this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
// Get an available port on current node and
@@ -123,4 +131,12 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
HddsProtos.PipelineID pipelineID) {
storageContainer.dispatch(request);
}
+
+ @Override
+ public List<PipelineReport> getPipelineReport() {
+ return Collections.singletonList(
+ PipelineReport.newBuilder()
+ .setPipelineID(PipelineID.valueOf(id).getProtobuf())
+ .build());
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
index 1863f6d..8c3fa5c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
@@ -21,8 +21,11 @@ package org.apache.hadoop.ozone.container.common.transport.server;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineReport;
import java.io.IOException;
+import java.util.List;
/** A server endpoint that acts as the communication layer for Ozone
* containers. */
@@ -49,4 +52,10 @@ public interface XceiverServerSpi {
void submitRequest(ContainerCommandRequestProto request,
HddsProtos.PipelineID pipelineID)
throws IOException;
+
+ /**
+ * Get pipeline report for the XceiverServer instance.
+ * @return list of report for each pipeline.
+ */
+ List<PipelineReport> getPipelineReport();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 24ea0b9..d88995b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -19,17 +19,18 @@
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineReport;
+import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineAction;
+import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -68,6 +69,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
@@ -96,12 +99,12 @@ public final class XceiverServerRatis implements XceiverServerSpi {
private final ReplicationLevel replicationLevel;
private long nodeFailureTimeoutMs;
- private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir,
+ private XceiverServerRatis(DatanodeDetails dd, int port,
ContainerDispatcher dispatcher, Configuration conf, StateContext context)
throws IOException {
Objects.requireNonNull(dd, "id == null");
this.port = port;
- RaftProperties serverProperties = newRaftProperties(conf, storageDir);
+ RaftProperties serverProperties = newRaftProperties(conf);
final int numWriteChunkThreads = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT);
@@ -118,15 +121,13 @@ public final class XceiverServerRatis implements XceiverServerSpi {
new ContainerStateMachine(dispatcher, chunkExecutor, this);
this.server = RaftServer.newBuilder()
.setServerId(RatisHelper.toRaftPeerId(dd))
- .setGroup(RatisHelper.emptyRaftGroup())
.setProperties(serverProperties)
.setStateMachine(stateMachine)
.build();
}
- private RaftProperties newRaftProperties(Configuration conf,
- String storageDir) {
+ private RaftProperties newRaftProperties(Configuration conf) {
final RaftProperties properties = new RaftProperties();
// Set rpc type
@@ -235,6 +236,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
nodeFailureTimeoutMs = nodeFailureTimeout.toLong(TimeUnit.MILLISECONDS);
// Set the ratis storage directory
+ String storageDir = HddsServerUtil.getOzoneDatanodeRatisDirectory(conf);
RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
// For grpc set the maximum message size
@@ -253,23 +255,9 @@ public final class XceiverServerRatis implements XceiverServerSpi {
public static XceiverServerRatis newXceiverServerRatis(
DatanodeDetails datanodeDetails, Configuration ozoneConf,
ContainerDispatcher dispatcher, StateContext context) throws IOException {
- final String ratisDir = File.separator + "ratis";
int localPort = ozoneConf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT);
- String storageDir = ozoneConf.get(
- OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR);
-
- if (Strings.isNullOrEmpty(storageDir)) {
- storageDir = ozoneConf.get(OzoneConfigKeys
- .OZONE_METADATA_DIRS);
- Preconditions.checkNotNull(storageDir, "ozone.metadata.dirs " +
- "cannot be null, Please check your configs.");
- storageDir = storageDir.concat(ratisDir);
- LOG.warn("Storage directory for Ratis is not configured. Mapping Ratis " +
- "storage under {}. It is a good idea to map this to an SSD disk.",
- storageDir);
- }
// Get an available port on current node and
// use that as the container port
@@ -282,13 +270,6 @@ public final class XceiverServerRatis implements XceiverServerSpi {
socket.bind(address);
localPort = socket.getLocalPort();
LOG.info("Found a free port for the server : {}", localPort);
- // If we have random local ports configured this means that it
- // probably running under MiniOzoneCluster. Ratis locks the storage
- // directories, so we need to pass different local directory for each
- // local instance. So we map ratis directories under datanode ID.
- storageDir =
- storageDir.concat(File.separator +
- datanodeDetails.getUuidString());
} catch (IOException e) {
LOG.error("Unable find a random free port for the server, "
+ "fallback to use default port {}", localPort, e);
@@ -296,7 +277,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
}
datanodeDetails.setPort(
DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort));
- return new XceiverServerRatis(datanodeDetails, localPort, storageDir,
+ return new XceiverServerRatis(datanodeDetails, localPort,
dispatcher, ozoneConf, context);
}
@@ -363,7 +344,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
public void submitRequest(
ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID)
throws IOException {
- // ReplicationLevel.ALL ensures the transactions corresponding to
+ // ReplicationLevel.MAJORITY ensures the transactions corresponding to
// the request here are applied on all the raft servers.
RaftClientRequest raftClientRequest =
createRaftClientRequest(request, pipelineID,
@@ -427,13 +408,27 @@ public final class XceiverServerRatis implements XceiverServerSpi {
+ ".Reason : " + action.getClosePipeline().getDetailedReason());
}
- void handleNodeSlowness(
- RaftGroup group, RoleInfoProto roleInfoProto) {
+ @Override
+ public List<PipelineReport> getPipelineReport() {
+ try {
+ Iterable<RaftGroupId> gids = server.getGroupIds();
+ List<PipelineReport> reports = new ArrayList<>();
+ for (RaftGroupId groupId : gids) {
+ reports.add(PipelineReport.newBuilder()
+ .setPipelineID(PipelineID.valueOf(groupId).getProtobuf())
+ .build());
+ }
+ return reports;
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ void handleNodeSlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
handlePipelineFailure(group.getGroupId(), roleInfoProto);
}
- void handleNoLeader(
- RaftGroup group, RoleInfoProto roleInfoProto) {
+ void handleNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) {
handlePipelineFailure(group.getGroupId(), roleInfoProto);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 72a5804..ebacf75 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -164,6 +166,16 @@ public class OzoneContainer {
return this.containerSet.getContainerReport();
}
+ public PipelineReportsProto getPipelineReport() {
+ PipelineReportsProto.Builder pipelineReportsProto =
+ PipelineReportsProto.newBuilder();
+ for (XceiverServerSpi serverInstance : server) {
+ pipelineReportsProto
+ .addAllPipelineReport(serverInstance.getPipelineReport());
+ }
+ return pipelineReportsProto.build();
+ }
+
/**
* Submit ContainerRequest.
* @param request
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
index a950a31..9296524 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
@@ -69,9 +71,11 @@ public interface StorageContainerDatanodeProtocol {
* @param containerReportsRequestProto - Container Reports.
* @return SCM Command.
*/
- SCMRegisteredResponseProto register(DatanodeDetailsProto datanodeDetails,
- NodeReportProto nodeReport, ContainerReportsProto
- containerReportsRequestProto) throws IOException;
+ SCMRegisteredResponseProto register(
+ DatanodeDetailsProto datanodeDetails,
+ NodeReportProto nodeReport,
+ ContainerReportsProto containerReportsRequestProto,
+ PipelineReportsProto pipelineReports) throws IOException;
/**
* Used by datanode to send block deletion ACK to SCM.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
index c9ef43f..b3c3eb3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
@@ -51,10 +53,12 @@ public interface StorageContainerNodeProtocol {
* Register the node if the node finds that it is not registered with any SCM.
* @param datanodeDetails DatanodeDetails
* @param nodeReport NodeReportProto
+ * @param pipelineReport PipelineReportsProto
* @return SCMHeartbeatResponseProto
*/
RegisteredCommand register(DatanodeDetails datanodeDetails,
- NodeReportProto nodeReport);
+ NodeReportProto nodeReport,
+ PipelineReportsProto pipelineReport);
/**
* Send heartbeat to indicate the datanode is alive and doing well.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
index 40fe189..b9cf6f9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
@@ -20,6 +20,8 @@ import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
@@ -149,12 +151,14 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
@Override
public SCMRegisteredResponseProto register(
DatanodeDetailsProto datanodeDetailsProto, NodeReportProto nodeReport,
- ContainerReportsProto containerReportsRequestProto)
+ ContainerReportsProto containerReportsRequestProto,
+ PipelineReportsProto pipelineReportsProto)
throws IOException {
SCMRegisterRequestProto.Builder req =
SCMRegisterRequestProto.newBuilder();
req.setDatanodeDetails(datanodeDetailsProto);
req.setContainerReport(containerReportsRequestProto);
+ req.setPipelineReports(pipelineReportsProto);
req.setNodeReport(nodeReport);
final SCMRegisteredResponseProto response;
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
index 7e8bd8a..ed01822 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.protocolPB;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto;
@@ -76,8 +78,9 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
ContainerReportsProto containerRequestProto = request
.getContainerReport();
NodeReportProto dnNodeReport = request.getNodeReport();
+ PipelineReportsProto pipelineReport = request.getPipelineReports();
return impl.register(request.getDatanodeDetails(), dnNodeReport,
- containerRequestProto);
+ containerRequestProto, pipelineReport);
} catch (IOException e) {
throw new ServiceException(e);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 0a69343..78758cb 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -52,6 +52,7 @@ message SCMRegisterRequestProto {
required DatanodeDetailsProto datanodeDetails = 1;
required NodeReportProto nodeReport = 2;
required ContainerReportsProto containerReport = 3;
+ required PipelineReportsProto pipelineReports = 4;
}
/**
@@ -82,6 +83,7 @@ message SCMHeartbeatRequestProto {
optional CommandStatusReportsProto commandStatusReport = 4;
optional ContainerActionsProto containerActions = 5;
optional PipelineActionsProto pipelineActions = 6;
+ optional PipelineReportsProto pipelineReports = 7;
}
/*
@@ -163,6 +165,14 @@ message ContainerAction {
optional Reason reason = 3;
}
+message PipelineReport {
+ required PipelineID pipelineID = 1;
+}
+
+message PipelineReportsProto {
+ repeated PipelineReport pipelineReport = 1;
+}
+
message PipelineActionsProto {
repeated PipelineAction pipelineActions = 1;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
index 751775f..27b6272 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
@@ -18,6 +18,10 @@ package org.apache.hadoop.ozone.container.common;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.CommandStatus;
import org.apache.hadoop.hdds.scm.VersionInfo;
@@ -214,8 +218,8 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
public StorageContainerDatanodeProtocolProtos
.SCMRegisteredResponseProto register(
DatanodeDetailsProto datanodeDetailsProto, NodeReportProto nodeReport,
- StorageContainerDatanodeProtocolProtos.ContainerReportsProto
- containerReportsRequestProto)
+ ContainerReportsProto containerReportsRequestProto,
+ PipelineReportsProto pipelineReportsProto)
throws IOException {
rpcCount.incrementAndGet();
updateNodeReport(datanodeDetailsProto, nodeReport);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index 206e24b..eb0a0b4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -466,24 +466,6 @@ public class ContainerMapping implements Mapping {
return new ContainerWithPipeline(containerInfo, pipeline);
}
- public void handlePipelineClose(PipelineID pipelineID) {
- try {
- Pipeline pipeline = pipelineSelector.getPipeline(pipelineID);
- if (pipeline != null) {
- pipelineSelector.finalizePipeline(pipeline);
- } else {
- LOG.debug("pipeline:{} not found", pipelineID);
- }
- } catch (Exception e) {
- LOG.info("failed to close pipeline:{}", pipelineID, e);
- }
- }
-
- public Set<PipelineID> getPipelineOnDatanode(
- DatanodeDetails datanodeDetails) {
- return pipelineSelector.getPipelineId(datanodeDetails.getUuid());
- }
-
/**
* Process container report from Datanode.
* <p>
@@ -710,7 +692,6 @@ public class ContainerMapping implements Mapping {
return containerStore;
}
- @VisibleForTesting
public PipelineSelector getPipelineSelector() {
return pipelineSelector;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
index dcbd49c..3f156de 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
@@ -89,20 +89,20 @@ public class ContainerReportHandler implements
.map(ContainerID::new)
.collect(Collectors.toSet());
- ReportResult reportResult = node2ContainerMap
+ ReportResult<ContainerID> reportResult = node2ContainerMap
.processReport(datanodeOrigin.getUuid(), containerIds);
//we have the report, so we can update the states for the next iteration.
node2ContainerMap
.setContainersForDatanode(datanodeOrigin.getUuid(), containerIds);
- for (ContainerID containerID : reportResult.getMissingContainers()) {
+ for (ContainerID containerID : reportResult.getMissingEntries()) {
containerStateManager
.removeContainerReplica(containerID, datanodeOrigin);
checkReplicationState(containerID, publisher);
}
- for (ContainerID containerID : reportResult.getNewContainers()) {
+ for (ContainerID containerID : reportResult.getNewEntries()) {
containerStateManager.addContainerReplica(containerID, datanodeOrigin);
checkReplicationState(containerID, publisher);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
index 1b0c57c..5ed80cb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
@@ -25,13 +25,12 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
-import java.util.Set;
/**
* Mapping class contains the mapping from a name to a pipeline mapping. This is
@@ -138,15 +137,5 @@ public interface Mapping extends Closeable {
String owner, ReplicationType type, ReplicationFactor factor,
LifeCycleState state) throws IOException;
- /**
- * Handle a pipeline close event.
- * @param pipelineID pipeline id
- */
- void handlePipelineClose(PipelineID pipelineID);
-
- /**
- * Get set of pipeline for a specific datanode.
- * @param datanodeDetails datanode for which pipelines needs to be fetched.
- */
- Set<PipelineID> getPipelineOnDatanode(DatanodeDetails datanodeDetails);
+ PipelineSelector getPipelineSelector();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 9d72eb1..745e052 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -27,10 +27,13 @@ import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
.DeleteBlockCommandStatus;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
.ReplicationStatus;
-import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler.CloseContainerRetryableReq;
+import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler
+ .CloseContainerRetryableReq;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+ .PipelineReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.PipelineActionsFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.ContainerActionsFromDatanode;
@@ -72,8 +75,7 @@ public final class SCMEvents {
/**
* ContainerReports are send out by Datanodes. This report is received by
- * SCMDatanodeHeartbeatDispatcher and Container_Report Event
- * isTestSCMDatanodeHeartbeatDispatcher generated.
+ * SCMDatanodeHeartbeatDispatcher and Container_Report Event is generated.
*/
public static final TypedEvent<ContainerReportFromDatanode> CONTAINER_REPORT =
new TypedEvent<>(ContainerReportFromDatanode.class, "Container_Report");
@@ -87,6 +89,13 @@ public final class SCMEvents {
"Container_Actions");
/**
+ * PipelineReports are send out by Datanodes. This report is received by
+ * SCMDatanodeHeartbeatDispatcher and Pipeline_Report Event is generated.
+ */
+ public static final TypedEvent<PipelineReportFromDatanode> PIPELINE_REPORT =
+ new TypedEvent<>(PipelineReportFromDatanode.class, "Pipeline_Report");
+
+ /**
* PipelineActions are sent by Datanode. This event is received by
* SCMDatanodeHeartbeatDispatcher and PIPELINE_ACTIONS event is generated.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index fca08bd..58da1cc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.node;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -363,7 +365,8 @@ public class SCMNodeManager
*/
@Override
public RegisteredCommand register(
- DatanodeDetails datanodeDetails, NodeReportProto nodeReport) {
+ DatanodeDetails datanodeDetails, NodeReportProto nodeReport,
+ PipelineReportsProto pipelineReportsProto) {
InetAddress dnAddress = Server.getRemoteIp();
if (dnAddress != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
index b435e77..ddbba82 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
@@ -19,17 +19,13 @@
package org.apache.hadoop.hdds.scm.node;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.container.Mapping;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Set;
-
/**
* Handles Stale node event.
*/
@@ -37,22 +33,17 @@ public class StaleNodeHandler implements EventHandler<DatanodeDetails> {
static final Logger LOG = LoggerFactory.getLogger(StaleNodeHandler.class);
private final Node2ContainerMap node2ContainerMap;
- private final Mapping containerManager;
+ private final PipelineSelector pipelineSelector;
public StaleNodeHandler(Node2ContainerMap node2ContainerMap,
- Mapping containerManager) {
+ PipelineSelector pipelineSelector) {
this.node2ContainerMap = node2ContainerMap;
- this.containerManager = containerManager;
+ this.pipelineSelector = pipelineSelector;
}
@Override
public void onMessage(DatanodeDetails datanodeDetails,
EventPublisher publisher) {
- Set<PipelineID> pipelineIDs =
- containerManager.getPipelineOnDatanode(datanodeDetails);
- for (PipelineID id : pipelineIDs) {
- LOG.info("closing pipeline {}.", id);
- publisher.fireEvent(SCMEvents.PIPELINE_CLOSE, id);
- }
+ pipelineSelector.handleStaleNode(datanodeDetails);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
index 97c254b..549080a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
@@ -18,13 +18,9 @@
package org.apache.hadoop.hdds.scm.node.states;
-import java.util.Collections;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
-import java.util.TreeSet;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
@@ -32,34 +28,29 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
- .DUPLICATE_DATANODE;
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
.NO_SUCH_DATANODE;
/**
* This data structure maintains the list of containers that is on a datanode.
* This information is built from the DN container reports.
*/
-public class Node2ContainerMap {
- private final Map<UUID, Set<ContainerID>> dn2ContainerMap;
+public class Node2ContainerMap extends Node2ObjectsMap<ContainerID> {
/**
* Constructs a Node2ContainerMap Object.
*/
public Node2ContainerMap() {
- dn2ContainerMap = new ConcurrentHashMap<>();
+ super();
}
/**
- * Returns true if this a datanode that is already tracked by
- * Node2ContainerMap.
+ * Returns null if there no containers associated with this datanode ID.
*
- * @param datanodeID - UUID of the Datanode.
- * @return True if this is tracked, false if this map does not know about it.
+ * @param datanode - UUID
+ * @return Set of containers or Null.
*/
- public boolean isKnownDatanode(UUID datanodeID) {
- Preconditions.checkNotNull(datanodeID);
- return dn2ContainerMap.containsKey(datanodeID);
+ public Set<ContainerID> getContainers(UUID datanode) {
+ return getObjects(datanode);
}
/**
@@ -70,13 +61,7 @@ public class Node2ContainerMap {
*/
public void insertNewDatanode(UUID datanodeID, Set<ContainerID> containerIDs)
throws SCMException {
- Preconditions.checkNotNull(containerIDs);
- Preconditions.checkNotNull(datanodeID);
- if (dn2ContainerMap.putIfAbsent(datanodeID, new HashSet<>(containerIDs))
- != null) {
- throw new SCMException("Node already exists in the map",
- DUPLICATE_DATANODE);
- }
+ super.insertNewDatanode(datanodeID, containerIDs);
}
/**
@@ -91,103 +76,15 @@ public class Node2ContainerMap {
Set<ContainerID> containers) throws SCMException {
Preconditions.checkNotNull(datanodeID);
Preconditions.checkNotNull(containers);
- if (dn2ContainerMap
+ if (dn2ObjectMap
.computeIfPresent(datanodeID, (k, v) -> new HashSet<>(containers))
== null) {
throw new SCMException("No such datanode", NO_SUCH_DATANODE);
}
}
- /**
- * Removes datanode Entry from the map.
- *
- * @param datanodeID - Datanode ID.
- */
- public void removeDatanode(UUID datanodeID) {
- Preconditions.checkNotNull(datanodeID);
- dn2ContainerMap.computeIfPresent(datanodeID, (k, v) -> null);
- }
-
- /**
- * Returns null if there no containers associated with this datanode ID.
- *
- * @param datanode - UUID
- * @return Set of containers or Null.
- */
- public Set<ContainerID> getContainers(UUID datanode) {
- Preconditions.checkNotNull(datanode);
- return dn2ContainerMap.computeIfPresent(datanode, (k, v) ->
- Collections.unmodifiableSet(v));
- }
-
- public ReportResult processReport(UUID datanodeID, Set<ContainerID>
- containers) {
- Preconditions.checkNotNull(datanodeID);
- Preconditions.checkNotNull(containers);
-
- if (!isKnownDatanode(datanodeID)) {
- return ReportResult.ReportResultBuilder.newBuilder()
- .setStatus(ReportStatus.NEW_DATANODE_FOUND)
- .setNewContainers(containers)
- .build();
- }
-
- // Conditions like Zero length containers should be handled by removeAll.
- Set<ContainerID> currentSet = dn2ContainerMap.get(datanodeID);
- TreeSet<ContainerID> newContainers = new TreeSet<>(containers);
- newContainers.removeAll(currentSet);
-
- TreeSet<ContainerID> missingContainers = new TreeSet<>(currentSet);
- missingContainers.removeAll(containers);
-
- if (newContainers.isEmpty() && missingContainers.isEmpty()) {
- return ReportResult.ReportResultBuilder.newBuilder()
- .setStatus(ReportStatus.ALL_IS_WELL)
- .build();
- }
-
- if (newContainers.isEmpty() && !missingContainers.isEmpty()) {
- return ReportResult.ReportResultBuilder.newBuilder()
- .setStatus(ReportStatus.MISSING_CONTAINERS)
- .setMissingContainers(missingContainers)
- .build();
- }
-
- if (!newContainers.isEmpty() && missingContainers.isEmpty()) {
- return ReportResult.ReportResultBuilder.newBuilder()
- .setStatus(ReportStatus.NEW_CONTAINERS_FOUND)
- .setNewContainers(newContainers)
- .build();
- }
-
- if (!newContainers.isEmpty() && !missingContainers.isEmpty()) {
- return ReportResult.ReportResultBuilder.newBuilder()
- .setStatus(ReportStatus.MISSING_AND_NEW_CONTAINERS_FOUND)
- .setNewContainers(newContainers)
- .setMissingContainers(missingContainers)
- .build();
- }
-
- // default status & Make compiler happy
- return ReportResult.ReportResultBuilder.newBuilder()
- .setStatus(ReportStatus.ALL_IS_WELL)
- .build();
- }
-
- /**
- * Results possible from processing a container report by
- * Node2ContainerMapper.
- */
- public enum ReportStatus {
- ALL_IS_WELL,
- MISSING_CONTAINERS,
- NEW_CONTAINERS_FOUND,
- MISSING_AND_NEW_CONTAINERS_FOUND,
- NEW_DATANODE_FOUND
- }
-
@VisibleForTesting
public int size() {
- return dn2ContainerMap.size();
+ return dn2ObjectMap.size();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
new file mode 100644
index 0000000..e49a79c
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.node.states;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+
+import java.util.UUID;
+import java.util.Set;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.HashSet;
+import java.util.Collections;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.DUPLICATE_DATANODE;
+
+/**
+ * This data structure maintains the list of containers that is on a datanode.
+ * This information is built from the DN container reports.
+ */
+public class Node2ObjectsMap<T> {
+ protected final Map<UUID, Set<T>> dn2ObjectMap;
+
+ /**
+ * Constructs a Node2ContainerMap Object.
+ */
+ public Node2ObjectsMap() {
+ dn2ObjectMap = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Returns true if this a datanode that is already tracked by
+ * Node2ContainerMap.
+ *
+ * @param datanodeID - UUID of the Datanode.
+ * @return True if this is tracked, false if this map does not know about it.
+ */
+ public boolean isKnownDatanode(UUID datanodeID) {
+ Preconditions.checkNotNull(datanodeID);
+ return dn2ObjectMap.containsKey(datanodeID);
+ }
+
+ /**
+ * Insert a new datanode into Node2Container Map.
+ *
+ * @param datanodeID -- Datanode UUID
+ * @param containerIDs - List of ContainerIDs.
+ */
+ public void insertNewDatanode(UUID datanodeID, Set<T> containerIDs)
+ throws SCMException {
+ Preconditions.checkNotNull(containerIDs);
+ Preconditions.checkNotNull(datanodeID);
+ if (dn2ObjectMap.putIfAbsent(datanodeID, new HashSet<>(containerIDs))
+ != null) {
+ throw new SCMException("Node already exists in the map",
+ DUPLICATE_DATANODE);
+ }
+ }
+
+ /**
+ * Removes datanode Entry from the map.
+ *
+ * @param datanodeID - Datanode ID.
+ */
+ void removeDatanode(UUID datanodeID) {
+ Preconditions.checkNotNull(datanodeID);
+ dn2ObjectMap.computeIfPresent(datanodeID, (k, v) -> null);
+ }
+
+ /**
+ * Returns null if there no containers associated with this datanode ID.
+ *
+ * @param datanode - UUID
+ * @return Set of containers or Null.
+ */
+ Set<T> getObjects(UUID datanode) {
+ Preconditions.checkNotNull(datanode);
+ final Set<T> s = dn2ObjectMap.get(datanode);
+ return s != null? Collections.unmodifiableSet(s): Collections.emptySet();
+ }
+
+ public ReportResult.ReportResultBuilder<T> newBuilder() {
+ return new ReportResult.ReportResultBuilder<>();
+ }
+
+ public ReportResult<T> processReport(UUID datanodeID, Set<T> objects) {
+ Preconditions.checkNotNull(datanodeID);
+ Preconditions.checkNotNull(objects);
+
+ if (!isKnownDatanode(datanodeID)) {
+ return newBuilder()
+ .setStatus(ReportResult.ReportStatus.NEW_DATANODE_FOUND)
+ .setNewEntries(objects)
+ .build();
+ }
+
+ // Conditions like Zero length containers should be handled by removeAll.
+ Set<T> currentSet = dn2ObjectMap.get(datanodeID);
+ TreeSet<T> newObjects = new TreeSet<>(objects);
+ newObjects.removeAll(currentSet);
+
+ TreeSet<T> missingObjects = new TreeSet<>(currentSet);
+ missingObjects.removeAll(objects);
+
+ if (newObjects.isEmpty() && missingObjects.isEmpty()) {
+ return newBuilder()
+ .setStatus(ReportResult.ReportStatus.ALL_IS_WELL)
+ .build();
+ }
+
+ if (newObjects.isEmpty() && !missingObjects.isEmpty()) {
+ return newBuilder()
+ .setStatus(ReportResult.ReportStatus.MISSING_ENTRIES)
+ .setMissingEntries(missingObjects)
+ .build();
+ }
+
+ if (!newObjects.isEmpty() && missingObjects.isEmpty()) {
+ return newBuilder()
+ .setStatus(ReportResult.ReportStatus.NEW_ENTRIES_FOUND)
+ .setNewEntries(newObjects)
+ .build();
+ }
+
+ if (!newObjects.isEmpty() && !missingObjects.isEmpty()) {
+ return newBuilder()
+ .setStatus(ReportResult.ReportStatus.MISSING_AND_NEW_ENTRIES_FOUND)
+ .setNewEntries(newObjects)
+ .setMissingEntries(missingObjects)
+ .build();
+ }
+
+ // default status & Make compiler happy
+ return newBuilder()
+ .setStatus(ReportResult.ReportStatus.ALL_IS_WELL)
+ .build();
+ }
+
+ @VisibleForTesting
+ public int size() {
+ return dn2ObjectMap.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
index 9bb6cf1..0c7610f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
@@ -19,83 +19,92 @@
package org.apache.hadoop.hdds.scm.node.states;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-
import java.util.Collections;
import java.util.Set;
import com.google.common.base.Preconditions;
/**
- * A Container Report gets processsed by the Node2Container and returns
- * Report Result class.
+ * A Container/Pipeline Report gets processed by the
+ * Node2Container/Node2Pipeline and returns Report Result class.
*/
-public class ReportResult {
- private Node2ContainerMap.ReportStatus status;
- private Set<ContainerID> missingContainers;
- private Set<ContainerID> newContainers;
-
- ReportResult(Node2ContainerMap.ReportStatus status,
- Set<ContainerID> missingContainers,
- Set<ContainerID> newContainers) {
+public final class ReportResult<T> {
+ private ReportStatus status;
+ private Set<T> missingEntries;
+ private Set<T> newEntries;
+
+ private ReportResult(ReportStatus status,
+ Set<T> missingEntries,
+ Set<T> newEntries) {
this.status = status;
- Preconditions.checkNotNull(missingContainers);
- Preconditions.checkNotNull(newContainers);
- this.missingContainers = missingContainers;
- this.newContainers = newContainers;
+ Preconditions.checkNotNull(missingEntries);
+ Preconditions.checkNotNull(newEntries);
+ this.missingEntries = missingEntries;
+ this.newEntries = newEntries;
}
- public Node2ContainerMap.ReportStatus getStatus() {
+ public ReportStatus getStatus() {
return status;
}
- public Set<ContainerID> getMissingContainers() {
- return missingContainers;
+ public Set<T> getMissingEntries() {
+ return missingEntries;
}
- public Set<ContainerID> getNewContainers() {
- return newContainers;
+ public Set<T> getNewEntries() {
+ return newEntries;
}
- static class ReportResultBuilder {
- private Node2ContainerMap.ReportStatus status;
- private Set<ContainerID> missingContainers;
- private Set<ContainerID> newContainers;
-
- static ReportResultBuilder newBuilder() {
- return new ReportResultBuilder();
- }
-
- public ReportResultBuilder setStatus(
- Node2ContainerMap.ReportStatus newstatus) {
- this.status = newstatus;
+ /**
+ * Result after processing report for node2Object map.
+ * @param <T>
+ */
+ public static class ReportResultBuilder<T> {
+ private ReportStatus status;
+ private Set<T> missingEntries;
+ private Set<T> newEntries;
+
+ public ReportResultBuilder<T> setStatus(
+ ReportStatus newStatus) {
+ this.status = newStatus;
return this;
}
- public ReportResultBuilder setMissingContainers(
- Set<ContainerID> missingContainersLit) {
- this.missingContainers = missingContainersLit;
+ public ReportResultBuilder<T> setMissingEntries(
+ Set<T> missingEntriesList) {
+ this.missingEntries = missingEntriesList;
return this;
}
- public ReportResultBuilder setNewContainers(
- Set<ContainerID> newContainersList) {
- this.newContainers = newContainersList;
+ public ReportResultBuilder<T> setNewEntries(
+ Set<T> newEntriesList) {
+ this.newEntries = newEntriesList;
return this;
}
- ReportResult build() {
+ public ReportResult<T> build() {
- Set<ContainerID> nullSafeMissingContainers = this.missingContainers;
- Set<ContainerID> nullSafeNewContainers = this.newContainers;
- if (nullSafeNewContainers == null) {
- nullSafeNewContainers = Collections.emptySet();
+ Set<T> nullSafeMissingEntries = this.missingEntries;
+ Set<T> nullSafeNewEntries = this.newEntries;
+ if (nullSafeNewEntries == null) {
+ nullSafeNewEntries = Collections.emptySet();
}
- if (nullSafeMissingContainers == null) {
- nullSafeMissingContainers = Collections.emptySet();
+ if (nullSafeMissingEntries == null) {
+ nullSafeMissingEntries = Collections.emptySet();
}
- return new ReportResult(status, nullSafeMissingContainers,
- nullSafeNewContainers);
+ return new ReportResult<T>(status, nullSafeMissingEntries,
+ nullSafeNewEntries);
}
}
+
+ /**
+ * Results possible from processing a report.
+ */
+ public enum ReportStatus {
+ ALL_IS_WELL,
+ MISSING_ENTRIES,
+ NEW_ENTRIES_FOUND,
+ MISSING_AND_NEW_ENTRIES_FOUND,
+ NEW_DATANODE_FOUND,
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
index 363ce71..87f2222 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
@@ -16,19 +16,15 @@
*
*/
-package org.apache.hadoop.hdds.scm.pipelines;
+package org.apache.hadoop.hdds.scm.node.states;
-import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import java.util.Collections;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
/**
* This data structure maintains the list of pipelines which the given datanode is a part of. This
@@ -36,33 +32,11 @@ import java.util.concurrent.ConcurrentHashMap;
*
* <p>TODO: this information needs to be regenerated from pipeline reports on SCM restart
*/
-public class Node2PipelineMap {
- private final Map<UUID, Set<PipelineID>> dn2PipelineMap;
+public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> {
/** Constructs a Node2PipelineMap Object. */
public Node2PipelineMap() {
- dn2PipelineMap = new ConcurrentHashMap<>();
- }
-
- /**
- * Returns true if this a datanode that is already tracked by Node2PipelineMap.
- *
- * @param datanodeID - UUID of the Datanode.
- * @return True if this is tracked, false if this map does not know about it.
- */
- private boolean isKnownDatanode(UUID datanodeID) {
- Preconditions.checkNotNull(datanodeID);
- return dn2PipelineMap.containsKey(datanodeID);
- }
-
- /**
- * Removes datanode Entry from the map.
- *
- * @param datanodeID - Datanode ID.
- */
- public synchronized void removeDatanode(UUID datanodeID) {
- Preconditions.checkNotNull(datanodeID);
- dn2PipelineMap.computeIfPresent(datanodeID, (k, v) -> null);
+ super();
}
/**
@@ -72,9 +46,7 @@ public class Node2PipelineMap {
* @return Set of pipelines or Null.
*/
public Set<PipelineID> getPipelines(UUID datanode) {
- Preconditions.checkNotNull(datanode);
- final Set<PipelineID> s = dn2PipelineMap.get(datanode);
- return s != null? Collections.unmodifiableSet(s): Collections.emptySet();
+ return getObjects(datanode);
}
/**
@@ -85,7 +57,7 @@ public class Node2PipelineMap {
public synchronized void addPipeline(Pipeline pipeline) {
for (DatanodeDetails details : pipeline.getDatanodes().values()) {
UUID dnId = details.getUuid();
- dn2PipelineMap.computeIfAbsent(dnId, k -> new HashSet<>())
+ dn2ObjectMap.computeIfAbsent(dnId, k -> new HashSet<>())
.add(pipeline.getId());
}
}
@@ -93,16 +65,11 @@ public class Node2PipelineMap {
public synchronized void removePipeline(Pipeline pipeline) {
for (DatanodeDetails details : pipeline.getDatanodes().values()) {
UUID dnId = details.getUuid();
- dn2PipelineMap.computeIfPresent(
- dnId,
+ dn2ObjectMap.computeIfPresent(dnId,
(k, v) -> {
v.remove(pipeline.getId());
return v;
});
}
}
-
- public Map<UUID, Set<PipelineID>> getDn2PipelineMap() {
- return Collections.unmodifiableMap(dn2PipelineMap);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
index 733dec5..e49678f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
@@ -17,22 +17,36 @@
package org.apache.hadoop.hdds.scm.pipelines;
-import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Handles pipeline close event.
*/
public class PipelineCloseHandler implements EventHandler<PipelineID> {
- private final Mapping mapping;
- public PipelineCloseHandler(Mapping mapping) {
- this.mapping = mapping;
+ private static final Logger LOG = LoggerFactory
+ .getLogger(PipelineCloseHandler.class);
+
+ private final PipelineSelector pipelineSelector;
+ public PipelineCloseHandler(PipelineSelector pipelineSelector) {
+ this.pipelineSelector = pipelineSelector;
}
@Override
public void onMessage(PipelineID pipelineID, EventPublisher publisher) {
- mapping.handlePipelineClose(pipelineID);
+ Pipeline pipeline = pipelineSelector.getPipeline(pipelineID);
+ try {
+ if (pipeline != null) {
+ pipelineSelector.finalizePipeline(pipeline);
+ } else {
+ LOG.debug("pipeline:{} not found", pipelineID);
+ }
+ } catch (Exception e) {
+ LOG.info("failed to close pipeline:{}", pipelineID, e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
index 07ff2b0..ca2e878 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
@@ -18,6 +18,8 @@ package org.apache.hadoop.hdds.scm.pipelines;
import java.util.ArrayList;
import java.util.LinkedList;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -36,7 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public abstract class PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(PipelineManager.class);
- private final ArrayList<ActivePipelines> activePipelines;
+ protected final ArrayList<ActivePipelines> activePipelines;
public PipelineManager() {
activePipelines = new ArrayList<>();
@@ -45,7 +47,10 @@ public abstract class PipelineManager {
}
}
- private static class ActivePipelines {
+ /**
+ * List of active pipelines.
+ */
+ public static class ActivePipelines {
private final List<PipelineID> activePipelines;
private final AtomicInteger pipelineIndex;
@@ -55,10 +60,12 @@ public abstract class PipelineManager {
}
void addPipeline(PipelineID pipelineID) {
- activePipelines.add(pipelineID);
+ if (!activePipelines.contains(pipelineID)) {
+ activePipelines.add(pipelineID);
+ }
}
- void removePipeline(PipelineID pipelineID) {
+ public void removePipeline(PipelineID pipelineID) {
activePipelines.remove(pipelineID);
}
@@ -117,17 +124,6 @@ public abstract class PipelineManager {
.addPipeline(pipeline.getId());
}
- protected static int getReplicationCount(ReplicationFactor factor) {
- switch (factor) {
- case ONE:
- return 1;
- case THREE:
- return 3;
- default:
- throw new IllegalArgumentException("Unexpected replication count");
- }
- }
-
public abstract Pipeline allocatePipeline(
ReplicationFactor replicationFactor);
@@ -137,6 +133,14 @@ public abstract class PipelineManager {
*/
public abstract void initializePipeline(Pipeline pipeline) throws IOException;
+ public void processPipelineReport(Pipeline pipeline, DatanodeDetails dn) {
+ if (pipeline.addMember(dn)
+ &&(pipeline.getDatanodes().size() == pipeline.getFactor().getNumber())
+ && pipeline.getLifeCycleState() == HddsProtos.LifeCycleState.OPEN) {
+ addOpenPipeline(pipeline);
+ }
+ }
+
/**
* Creates a pipeline with a specified replication factor and type.
* @param replicationFactor - Replication Factor.
@@ -157,27 +161,11 @@ public abstract class PipelineManager {
* Remove the pipeline from active allocation.
* @param pipeline pipeline to be finalized
*/
- public synchronized void finalizePipeline(Pipeline pipeline) {
- activePipelines.get(pipeline.getFactor().ordinal())
- .removePipeline(pipeline.getId());
- }
+ public abstract boolean finalizePipeline(Pipeline pipeline);
/**
*
* @param pipeline
*/
public abstract void closePipeline(Pipeline pipeline) throws IOException;
-
- /**
- * list members in the pipeline.
- * @return the datanode
- */
- public abstract List<DatanodeDetails> getMembers(PipelineID pipelineID)
- throws IOException;
-
- /**
- * Update the datanode list of the pipeline.
- */
- public abstract void updatePipeline(PipelineID pipelineID,
- List<DatanodeDetails> newDatanodes) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java
new file mode 100644
index 0000000..933792b
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipelines;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.scm.server
+ .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles Node Reports from datanode.
+ */
+public class PipelineReportHandler implements
+ EventHandler<PipelineReportFromDatanode> {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(PipelineReportHandler.class);
+ private final PipelineSelector pipelineSelector;
+
+ public PipelineReportHandler(PipelineSelector pipelineSelector) {
+ Preconditions.checkNotNull(pipelineSelector);
+ this.pipelineSelector = pipelineSelector;
+ }
+
+ @Override
+ public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
+ EventPublisher publisher) {
+ Preconditions.checkNotNull(pipelineReportFromDatanode);
+ DatanodeDetails dn = pipelineReportFromDatanode.getDatanodeDetails();
+ PipelineReportsProto pipelineReport =
+ pipelineReportFromDatanode.getReport();
+ Preconditions.checkNotNull(dn, "Pipeline Report is "
+ + "missing DatanodeDetails.");
+ LOGGER.trace("Processing pipeline report for dn: {}", dn);
+ pipelineSelector.processPipelineReport(dn, pipelineReport);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org