You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/07/20 21:38:12 UTC
[1/3] hadoop git commit: Revert "HDDS-239. Add PipelineStateManager
to track pipeline state transition. Contributed by Mukul Kumar Singh."
Repository: hadoop
Updated Branches:
refs/heads/trunk c7ae55675 -> 347c95501
Revert "HDDS-239. Add PipelineStateManager to track pipeline state transition. Contributed by Mukul Kumar Singh."
This reverts commit 6837121a43231f854b0b22ad20330012439313ce.(Mixed with HDDS-260)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d2acf8d5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d2acf8d5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d2acf8d5
Branch: refs/heads/trunk
Commit: d2acf8d560950f06ffbf5c217fbfab76cd70d5da
Parents: c7ae556
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Fri Jul 20 14:20:18 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Fri Jul 20 14:20:18 2018 -0700
----------------------------------------------------------------------
.../apache/hadoop/hdds/scm/ScmConfigKeys.java | 5 -
.../scm/container/common/helpers/Pipeline.java | 7 -
.../common/src/main/resources/ozone-default.xml | 12 -
.../common/statemachine/StateContext.java | 52 +---
.../states/endpoint/HeartbeatEndpointTask.java | 24 +-
.../StorageContainerDatanodeProtocol.proto | 4 +-
.../common/report/TestReportPublisher.java | 41 +++
.../endpoint/TestHeartbeatEndpointTask.java | 302 -------------------
.../common/states/endpoint/package-info.java | 18 --
.../hdds/scm/container/ContainerMapping.java | 4 -
.../hdds/scm/exceptions/SCMException.java | 1 -
.../hdds/scm/pipelines/PipelineManager.java | 64 ++--
.../hdds/scm/pipelines/PipelineSelector.java | 212 ++-----------
.../scm/pipelines/ratis/RatisManagerImpl.java | 33 +-
.../standalone/StandaloneManagerImpl.java | 21 +-
.../hdds/scm/pipeline/TestNode2PipelineMap.java | 14 -
16 files changed, 146 insertions(+), 668 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 6e940ad..71184cf 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -236,11 +236,6 @@ public final class ScmConfigKeys {
public static final String
OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT = "60s";
- public static final String OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT =
- "ozone.scm.pipeline.creation.lease.timeout";
-
- public static final String
- OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT = "60s";
public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY =
"ozone.scm.block.deletion.max.retry";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
index 534c9fd..c5794f4 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
@@ -214,13 +214,6 @@ public class Pipeline {
}
/**
- * Update the State of the pipeline.
- */
- public void setLifeCycleState(HddsProtos.LifeCycleState nextState) {
- lifeCycleState = nextState;
- }
-
- /**
* Gets the pipeline Name.
*
* @return - Name of the pipeline
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 69a382a..5a1d26a 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1085,17 +1085,5 @@
executed since last report. Unit could be defined with
postfix (ns,ms,s,m,h,d)</description>
</property>
- <property>
- <name>ozone.scm.pipeline.creation.lease.timeout</name>
- <value>60s</value>
- <tag>OZONE, SCM, PIPELINE</tag>
- <description>
- Pipeline creation timeout in milliseconds to be used by SCM. When
- BEGIN_CREATE event happens the pipeline is moved from ALLOCATED to
- CREATING state, SCM will now wait for the configured amount of time
- to get COMPLETE_CREATE event if it doesn't receive it will move the
- pipeline to DELETING.
- </description>
- </property>
</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 4951f2a..faaff69 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -20,18 +20,14 @@ import com.google.protobuf.GeneratedMessage;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerAction;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.InitDatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.RunningDatanodeState;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
-import org.apache.hadoop.ozone.protocol.commands.CommandStatus
- .CommandStatusBuilder;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus.CommandStatusBuilder;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +43,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
@@ -64,7 +59,6 @@ public class StateContext {
private final AtomicLong stateExecutionCount;
private final Configuration conf;
private final Queue<GeneratedMessage> reports;
- private final Queue<ContainerAction> containerActions;
private DatanodeStateMachine.DatanodeStates state;
/**
@@ -82,7 +76,6 @@ public class StateContext {
commandQueue = new LinkedList<>();
cmdStatusMap = new ConcurrentHashMap<>();
reports = new LinkedList<>();
- containerActions = new LinkedList<>();
lock = new ReentrantLock();
stateExecutionCount = new AtomicLong(0);
}
@@ -205,47 +198,6 @@ public class StateContext {
return results;
}
-
- /**
- * Adds the ContainerAction to ContainerAction queue.
- *
- * @param containerAction ContainerAction to be added
- */
- public void addContainerAction(ContainerAction containerAction) {
- synchronized (containerActions) {
- containerActions.add(containerAction);
- }
- }
-
- /**
- * Returns all the pending ContainerActions from the ContainerAction queue,
- * or empty list if the queue is empty.
- *
- * @return List<ContainerAction>
- */
- public List<ContainerAction> getAllPendingContainerActions() {
- return getPendingContainerAction(Integer.MAX_VALUE);
- }
-
- /**
- * Returns pending ContainerActions from the ContainerAction queue with a
- * max limit on list size, or empty list if the queue is empty.
- *
- * @return List<ContainerAction>
- */
- public List<ContainerAction> getPendingContainerAction(int maxLimit) {
- List<ContainerAction> results = new ArrayList<>();
- synchronized (containerActions) {
- containerActions.parallelStream().limit(maxLimit).collect(Collectors.toList());
- ContainerAction action = containerActions.poll();
- while(results.size() < maxLimit && action != null) {
- results.add(action);
- action = containerActions.poll();
- }
- }
- return results;
- }
-
/**
* Returns the next task to get executed by the datanode state machine.
* @return A callable that will be executed by the
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 214e1cd..260a245 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -25,10 +25,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerAction;
-import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
@@ -50,7 +46,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.ZonedDateTime;
-import java.util.List;
import java.util.concurrent.Callable;
/**
@@ -112,7 +107,7 @@ public class HeartbeatEndpointTask
SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetailsProto);
addReports(requestBuilder);
- addContainerActions(requestBuilder);
+
SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
.sendHeartbeat(requestBuilder.build());
processResponse(reponse, datanodeDetailsProto);
@@ -145,23 +140,6 @@ public class HeartbeatEndpointTask
}
/**
- * Adds all the pending ContainerActions to the heartbeat.
- *
- * @param requestBuilder builder to which the report has to be added.
- */
- private void addContainerActions(
- SCMHeartbeatRequestProto.Builder requestBuilder) {
- List<ContainerAction> actions = context.getAllPendingContainerActions();
- if (!actions.isEmpty()) {
- ContainerActionsProto cap = ContainerActionsProto.newBuilder()
- .addAllContainerActions(actions)
- .build();
- requestBuilder.setContainerActions(cap);
- }
- }
-
-
- /**
* Returns a builder class for HeartbeatEndpointTask task.
* @return Builder.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index d89567b..4238389 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -79,8 +79,8 @@ message SCMHeartbeatRequestProto {
required DatanodeDetailsProto datanodeDetails = 1;
optional NodeReportProto nodeReport = 2;
optional ContainerReportsProto containerReport = 3;
- optional CommandStatusReportsProto commandStatusReport = 4;
- optional ContainerActionsProto containerActions = 5;
+ optional ContainerActionsProto containerActions = 4;
+ optional CommandStatusReportsProto commandStatusReport = 5;
}
/*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index 811599f..a0db2e8 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.common.report;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Descriptors;
import com.google.protobuf.GeneratedMessage;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -27,8 +28,14 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
@@ -171,6 +178,22 @@ public class TestReportPublisher {
executorService.shutdown();
}
+ @Test
+ public void testAddingReportToHeartbeat() {
+ GeneratedMessage nodeReport = NodeReportProto.getDefaultInstance();
+ GeneratedMessage containerReport = ContainerReportsProto
+ .getDefaultInstance();
+ SCMHeartbeatRequestProto.Builder heartbeatBuilder =
+ SCMHeartbeatRequestProto.newBuilder();
+ heartbeatBuilder.setDatanodeDetails(
+ getDatanodeDetails().getProtoBufMessage());
+ addReport(heartbeatBuilder, nodeReport);
+ addReport(heartbeatBuilder, containerReport);
+ SCMHeartbeatRequestProto heartbeat = heartbeatBuilder.build();
+ Assert.assertTrue(heartbeat.hasNodeReport());
+ Assert.assertTrue(heartbeat.hasContainerReport());
+ }
+
/**
* Get a datanode details.
*
@@ -199,4 +222,22 @@ public class TestReportPublisher {
return builder.build();
}
+ /**
+ * Adds the report to heartbeat.
+ *
+ * @param requestBuilder builder to which the report has to be added.
+ * @param report the report to be added.
+ */
+ private static void addReport(SCMHeartbeatRequestProto.Builder
+ requestBuilder, GeneratedMessage report) {
+ String reportName = report.getDescriptorForType().getFullName();
+ for (Descriptors.FieldDescriptor descriptor :
+ SCMHeartbeatRequestProto.getDescriptor().getFields()) {
+ String heartbeatFieldName = descriptor.getMessageType().getFullName();
+ if (heartbeatFieldName.equals(reportName)) {
+ requestBuilder.setField(descriptor, report);
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
deleted file mode 100644
index 87bd811..0000000
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.states.endpoint;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerInfo;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerAction;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.NodeReportProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
-import org.apache.hadoop.ozone.container.common.statemachine
- .DatanodeStateMachine;
-import org.apache.hadoop.ozone.container.common.statemachine
- .DatanodeStateMachine.DatanodeStates;
-import org.apache.hadoop.ozone.container.common.statemachine
- .EndpointStateMachine;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.protocolPB
- .StorageContainerDatanodeProtocolClientSideTranslatorPB;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
-
-import java.util.UUID;
-
-import static org.mockito.ArgumentMatchers.any;
-
-/**
- * This class tests the functionality of HeartbeatEndpointTask.
- */
-public class TestHeartbeatEndpointTask {
-
-
- @Test
- public void testheartbeatWithoutReports() throws Exception {
- StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
- Mockito.mock(
- StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
- ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
- .forClass(SCMHeartbeatRequestProto.class);
- Mockito.when(scm.sendHeartbeat(argument.capture()))
- .thenAnswer(invocation ->
- SCMHeartbeatResponseProto.newBuilder()
- .setDatanodeUUID(
- ((SCMHeartbeatRequestProto)invocation.getArgument(0))
- .getDatanodeDetails().getUuid())
- .build());
-
- HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(scm);
- endpointTask.call();
- SCMHeartbeatRequestProto heartbeat = argument.getValue();
- Assert.assertTrue(heartbeat.hasDatanodeDetails());
- Assert.assertFalse(heartbeat.hasNodeReport());
- Assert.assertFalse(heartbeat.hasContainerReport());
- Assert.assertFalse(heartbeat.hasCommandStatusReport());
- Assert.assertFalse(heartbeat.hasContainerActions());
- }
-
- @Test
- public void testheartbeatWithNodeReports() throws Exception {
- Configuration conf = new OzoneConfiguration();
- StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- Mockito.mock(DatanodeStateMachine.class));
-
- StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
- Mockito.mock(
- StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
- ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
- .forClass(SCMHeartbeatRequestProto.class);
- Mockito.when(scm.sendHeartbeat(argument.capture()))
- .thenAnswer(invocation ->
- SCMHeartbeatResponseProto.newBuilder()
- .setDatanodeUUID(
- ((SCMHeartbeatRequestProto)invocation.getArgument(0))
- .getDatanodeDetails().getUuid())
- .build());
-
- HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
- conf, context, scm);
- context.addReport(NodeReportProto.getDefaultInstance());
- endpointTask.call();
- SCMHeartbeatRequestProto heartbeat = argument.getValue();
- Assert.assertTrue(heartbeat.hasDatanodeDetails());
- Assert.assertTrue(heartbeat.hasNodeReport());
- Assert.assertFalse(heartbeat.hasContainerReport());
- Assert.assertFalse(heartbeat.hasCommandStatusReport());
- Assert.assertFalse(heartbeat.hasContainerActions());
- }
-
- @Test
- public void testheartbeatWithContainerReports() throws Exception {
- Configuration conf = new OzoneConfiguration();
- StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- Mockito.mock(DatanodeStateMachine.class));
-
- StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
- Mockito.mock(
- StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
- ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
- .forClass(SCMHeartbeatRequestProto.class);
- Mockito.when(scm.sendHeartbeat(argument.capture()))
- .thenAnswer(invocation ->
- SCMHeartbeatResponseProto.newBuilder()
- .setDatanodeUUID(
- ((SCMHeartbeatRequestProto)invocation.getArgument(0))
- .getDatanodeDetails().getUuid())
- .build());
-
- HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
- conf, context, scm);
- context.addReport(ContainerReportsProto.getDefaultInstance());
- endpointTask.call();
- SCMHeartbeatRequestProto heartbeat = argument.getValue();
- Assert.assertTrue(heartbeat.hasDatanodeDetails());
- Assert.assertFalse(heartbeat.hasNodeReport());
- Assert.assertTrue(heartbeat.hasContainerReport());
- Assert.assertFalse(heartbeat.hasCommandStatusReport());
- Assert.assertFalse(heartbeat.hasContainerActions());
- }
-
- @Test
- public void testheartbeatWithCommandStatusReports() throws Exception {
- Configuration conf = new OzoneConfiguration();
- StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- Mockito.mock(DatanodeStateMachine.class));
-
- StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
- Mockito.mock(
- StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
- ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
- .forClass(SCMHeartbeatRequestProto.class);
- Mockito.when(scm.sendHeartbeat(argument.capture()))
- .thenAnswer(invocation ->
- SCMHeartbeatResponseProto.newBuilder()
- .setDatanodeUUID(
- ((SCMHeartbeatRequestProto)invocation.getArgument(0))
- .getDatanodeDetails().getUuid())
- .build());
-
- HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
- conf, context, scm);
- context.addReport(CommandStatusReportsProto.getDefaultInstance());
- endpointTask.call();
- SCMHeartbeatRequestProto heartbeat = argument.getValue();
- Assert.assertTrue(heartbeat.hasDatanodeDetails());
- Assert.assertFalse(heartbeat.hasNodeReport());
- Assert.assertFalse(heartbeat.hasContainerReport());
- Assert.assertTrue(heartbeat.hasCommandStatusReport());
- Assert.assertFalse(heartbeat.hasContainerActions());
- }
-
- @Test
- public void testheartbeatWithContainerActions() throws Exception {
- Configuration conf = new OzoneConfiguration();
- StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- Mockito.mock(DatanodeStateMachine.class));
-
- StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
- Mockito.mock(
- StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
- ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
- .forClass(SCMHeartbeatRequestProto.class);
- Mockito.when(scm.sendHeartbeat(argument.capture()))
- .thenAnswer(invocation ->
- SCMHeartbeatResponseProto.newBuilder()
- .setDatanodeUUID(
- ((SCMHeartbeatRequestProto)invocation.getArgument(0))
- .getDatanodeDetails().getUuid())
- .build());
-
- HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
- conf, context, scm);
- context.addContainerAction(getContainerAction());
- endpointTask.call();
- SCMHeartbeatRequestProto heartbeat = argument.getValue();
- Assert.assertTrue(heartbeat.hasDatanodeDetails());
- Assert.assertFalse(heartbeat.hasNodeReport());
- Assert.assertFalse(heartbeat.hasContainerReport());
- Assert.assertFalse(heartbeat.hasCommandStatusReport());
- Assert.assertTrue(heartbeat.hasContainerActions());
- }
-
- @Test
- public void testheartbeatWithAllReports() throws Exception {
- Configuration conf = new OzoneConfiguration();
- StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- Mockito.mock(DatanodeStateMachine.class));
-
- StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
- Mockito.mock(
- StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
- ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
- .forClass(SCMHeartbeatRequestProto.class);
- Mockito.when(scm.sendHeartbeat(argument.capture()))
- .thenAnswer(invocation ->
- SCMHeartbeatResponseProto.newBuilder()
- .setDatanodeUUID(
- ((SCMHeartbeatRequestProto)invocation.getArgument(0))
- .getDatanodeDetails().getUuid())
- .build());
-
- HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
- conf, context, scm);
- context.addReport(NodeReportProto.getDefaultInstance());
- context.addReport(ContainerReportsProto.getDefaultInstance());
- context.addReport(CommandStatusReportsProto.getDefaultInstance());
- context.addContainerAction(getContainerAction());
- endpointTask.call();
- SCMHeartbeatRequestProto heartbeat = argument.getValue();
- Assert.assertTrue(heartbeat.hasDatanodeDetails());
- Assert.assertTrue(heartbeat.hasNodeReport());
- Assert.assertTrue(heartbeat.hasContainerReport());
- Assert.assertTrue(heartbeat.hasCommandStatusReport());
- Assert.assertTrue(heartbeat.hasContainerActions());
- }
-
- /**
- * Creates HeartbeatEndpointTask for the given StorageContainerManager proxy.
- *
- * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB
- *
- * @return HeartbeatEndpointTask
- */
- private HeartbeatEndpointTask getHeartbeatEndpointTask(
- StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) {
- Configuration conf = new OzoneConfiguration();
- StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- Mockito.mock(DatanodeStateMachine.class));
- return getHeartbeatEndpointTask(conf, context, proxy);
-
- }
-
- /**
- * Creates HeartbeatEndpointTask with the given conf, context and
- * StorageContainerManager client side proxy.
- *
- * @param conf Configuration
- * @param context StateContext
- * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB
- *
- * @return HeartbeatEndpointTask
- */
- private HeartbeatEndpointTask getHeartbeatEndpointTask(
- Configuration conf,
- StateContext context,
- StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) {
- DatanodeDetails datanodeDetails = DatanodeDetails.newBuilder()
- .setUuid(UUID.randomUUID().toString())
- .setHostName("localhost")
- .setIpAddress("127.0.0.1")
- .build();
- EndpointStateMachine endpointStateMachine = Mockito
- .mock(EndpointStateMachine.class);
- Mockito.when(endpointStateMachine.getEndPoint()).thenReturn(proxy);
- return HeartbeatEndpointTask.newBuilder()
- .setConfig(conf)
- .setDatanodeDetails(datanodeDetails)
- .setContext(context)
- .setEndpointStateMachine(endpointStateMachine)
- .build();
- }
-
- private ContainerAction getContainerAction() {
- ContainerAction.Builder builder = ContainerAction.newBuilder();
- ContainerInfo containerInfo = ContainerInfo.newBuilder()
- .setContainerID(1L)
- .build();
- builder.setContainer(containerInfo)
- .setAction(ContainerAction.Action.CLOSE)
- .setReason(ContainerAction.Reason.CONTAINER_FULL);
- return builder.build();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
deleted file mode 100644
index d120a5c..0000000
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.states.endpoint;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index f07d22b..26f4d86 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -658,10 +658,6 @@ public class ContainerMapping implements Mapping {
if (containerStore != null) {
containerStore.close();
}
-
- if (pipelineSelector != null) {
- pipelineSelector.shutdown();
- }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
index 0085542..d7d70ef 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
@@ -107,7 +107,6 @@ public class SCMException extends IOException {
FAILED_TO_LOAD_OPEN_CONTAINER,
FAILED_TO_ALLOCATE_CONTAINER,
FAILED_TO_CHANGE_CONTAINER_STATE,
- FAILED_TO_CHANGE_PIPELINE_STATE,
CONTAINER_EXISTS,
FAILED_TO_FIND_CONTAINER,
FAILED_TO_FIND_CONTAINER_WITH_SPACE,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
index 77d8211..a041973 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
@@ -59,16 +59,41 @@ public abstract class PipelineManager {
* @return a Pipeline.
*/
public synchronized final Pipeline getPipeline(
- ReplicationFactor replicationFactor, ReplicationType replicationType) {
- Pipeline pipeline = findOpenPipeline(replicationType, replicationFactor);
+ ReplicationFactor replicationFactor, ReplicationType replicationType)
+ throws IOException {
+ /**
+ * In the Ozone world, we have a very simple policy.
+ *
+ * 1. Try to create a pipeline if there are enough free nodes.
+ *
+ * 2. This allows all nodes to part of a pipeline quickly.
+ *
+ * 3. if there are not enough free nodes, return pipeline in a
+ * round-robin fashion.
+ *
+ * TODO: Might have to come up with a better algorithm than this.
+ * Create a new placement policy that returns pipelines in round robin
+ * fashion.
+ */
+ Pipeline pipeline = allocatePipeline(replicationFactor);
if (pipeline != null) {
- LOG.debug("re-used pipeline:{} for container with " +
+ LOG.debug("created new pipeline:{} for container with " +
"replicationType:{} replicationFactor:{}",
pipeline.getPipelineName(), replicationType, replicationFactor);
+ activePipelines.add(pipeline);
+ activePipelineMap.put(pipeline.getPipelineName(), pipeline);
+ node2PipelineMap.addPipeline(pipeline);
+ } else {
+ pipeline = findOpenPipeline(replicationType, replicationFactor);
+ if (pipeline != null) {
+ LOG.debug("re-used pipeline:{} for container with " +
+ "replicationType:{} replicationFactor:{}",
+ pipeline.getPipelineName(), replicationType, replicationFactor);
+ }
}
if (pipeline == null) {
LOG.error("Get pipeline call failed. We are not able to find" +
- " operational pipeline.");
+ "free nodes or operational pipeline.");
return null;
} else {
return pipeline;
@@ -84,7 +109,7 @@ public abstract class PipelineManager {
public synchronized final Pipeline getPipeline(String pipelineName) {
Pipeline pipeline = null;
- // 1. Check if pipeline already exists
+ // 1. Check if pipeline channel already exists
if (activePipelineMap.containsKey(pipelineName)) {
pipeline = activePipelineMap.get(pipelineName);
LOG.debug("Returning pipeline for pipelineName:{}", pipelineName);
@@ -107,13 +132,7 @@ public abstract class PipelineManager {
}
public abstract Pipeline allocatePipeline(
- ReplicationFactor replicationFactor);
-
- /**
- * Initialize the pipeline
- * TODO: move the initialization to Ozone Client later
- */
- public abstract void initializePipeline(Pipeline pipeline) throws IOException;
+ ReplicationFactor replicationFactor) throws IOException;
public void removePipeline(Pipeline pipeline) {
activePipelines.remove(pipeline);
@@ -160,23 +179,12 @@ public abstract class PipelineManager {
}
/**
- * Creates a pipeline with a specified replication factor and type.
- * @param replicationFactor - Replication Factor.
- * @param replicationType - Replication Type.
+ * Creates a pipeline from a specified set of Nodes.
+ * @param pipelineID - Name of the pipeline
+ * @param datanodes - The list of datanodes that make this pipeline.
*/
- public Pipeline createPipeline(ReplicationFactor replicationFactor,
- ReplicationType replicationType) throws IOException {
- Pipeline pipeline = allocatePipeline(replicationFactor);
- if (pipeline != null) {
- LOG.debug("created new pipeline:{} for container with "
- + "replicationType:{} replicationFactor:{}",
- pipeline.getPipelineName(), replicationType, replicationFactor);
- activePipelines.add(pipeline);
- activePipelineMap.put(pipeline.getPipelineName(), pipeline);
- node2PipelineMap.addPipeline(pipeline);
- }
- return pipeline;
- }
+ public abstract void createPipeline(String pipelineID,
+ List<DatanodeDetails> datanodes) throws IOException;
/**
* Close the pipeline with the given clusterId.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
index 08710e7..2955af5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.SCMContainerPlacementRandom;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl;
import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl;
@@ -34,28 +33,17 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.common.statemachine
- .InvalidStateTransitionException;
-import org.apache.hadoop.ozone.common.statemachine.StateMachine;
-import org.apache.hadoop.ozone.lease.Lease;
-import org.apache.hadoop.ozone.lease.LeaseException;
-import org.apache.hadoop.ozone.lease.LeaseManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
- .FAILED_TO_CHANGE_PIPELINE_STATE;
-
/**
* Sends the request to the right pipeline manager.
*/
@@ -69,10 +57,6 @@ public class PipelineSelector {
private final StandaloneManagerImpl standaloneManager;
private final long containerSize;
private final Node2PipelineMap node2PipelineMap;
- private final LeaseManager<Pipeline> pipelineLeaseManager;
- private final StateMachine<LifeCycleState,
- HddsProtos.LifeCycleEvent> stateMachine;
-
/**
* Constructs a pipeline Selector.
*
@@ -93,74 +77,6 @@ public class PipelineSelector {
this.ratisManager =
new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
conf, node2PipelineMap);
- // Initialize the container state machine.
- Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
- long pipelineCreationLeaseTimeout = conf.getTimeDuration(
- ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
- ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
- TimeUnit.MILLISECONDS);
- LOG.trace("Starting Pipeline Lease Manager.");
- pipelineLeaseManager = new LeaseManager<>(pipelineCreationLeaseTimeout);
- pipelineLeaseManager.start();
-
- // These are the steady states of a container.
- finalStates.add(HddsProtos.LifeCycleState.OPEN);
- finalStates.add(HddsProtos.LifeCycleState.CLOSED);
-
- this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED,
- finalStates);
- initializeStateMachine();
- }
-
- /**
- * Event and State Transition Mapping:
- *
- * State: ALLOCATED ---------------> CREATING
- * Event: CREATE
- *
- * State: CREATING ---------------> OPEN
- * Event: CREATED
- *
- * State: OPEN ---------------> CLOSING
- * Event: FINALIZE
- *
- * State: CLOSING ---------------> CLOSED
- * Event: CLOSE
- *
- * State: CREATING ---------------> CLOSED
- * Event: TIMEOUT
- *
- *
- * Container State Flow:
- *
- * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
- * (CREATE) | (CREATED) (FINALIZE) |
- * | |
- * | |
- * |(TIMEOUT) |(CLOSE)
- * | |
- * +--------> [CLOSED] <--------+
- */
- private void initializeStateMachine() {
- stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED,
- HddsProtos.LifeCycleState.CREATING,
- HddsProtos.LifeCycleEvent.CREATE);
-
- stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
- HddsProtos.LifeCycleState.OPEN,
- HddsProtos.LifeCycleEvent.CREATED);
-
- stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN,
- HddsProtos.LifeCycleState.CLOSING,
- HddsProtos.LifeCycleEvent.FINALIZE);
-
- stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING,
- HddsProtos.LifeCycleState.CLOSED,
- HddsProtos.LifeCycleEvent.CLOSE);
-
- stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
- HddsProtos.LifeCycleState.CLOSED,
- HddsProtos.LifeCycleEvent.TIMEOUT);
}
/**
@@ -172,14 +88,15 @@ public class PipelineSelector {
* @return pipeline corresponding to nodes
*/
public static Pipeline newPipelineFromNodes(
- List<DatanodeDetails> nodes, ReplicationType replicationType,
- ReplicationFactor replicationFactor, String name) {
+ List<DatanodeDetails> nodes, LifeCycleState state,
+ ReplicationType replicationType, ReplicationFactor replicationFactor,
+ String name) {
Preconditions.checkNotNull(nodes);
Preconditions.checkArgument(nodes.size() > 0);
String leaderId = nodes.get(0).getUuidString();
- // A new pipeline always starts in allocated state
- Pipeline pipeline = new Pipeline(leaderId, LifeCycleState.ALLOCATED,
- replicationType, replicationFactor, name);
+ Pipeline
+ pipeline = new Pipeline(leaderId, state, replicationType,
+ replicationFactor, name);
for (DatanodeDetails node : nodes) {
pipeline.addMember(node);
}
@@ -258,35 +175,8 @@ public class PipelineSelector {
LOG.debug("Getting replication pipeline forReplicationType {} :" +
" ReplicationFactor {}", replicationType.toString(),
replicationFactor.toString());
-
- /**
- * In the Ozone world, we have a very simple policy.
- *
- * 1. Try to create a pipeline if there are enough free nodes.
- *
- * 2. This allows all nodes to part of a pipeline quickly.
- *
- * 3. if there are not enough free nodes, return already allocated pipeline
- * in a round-robin fashion.
- *
- * TODO: Might have to come up with a better algorithm than this.
- * Create a new placement policy that returns pipelines in round robin
- * fashion.
- */
- Pipeline pipeline =
- manager.createPipeline(replicationFactor, replicationType);
- if (pipeline == null) {
- // try to return a pipeline from already allocated pipelines
- pipeline = manager.getPipeline(replicationFactor, replicationType);
- } else {
- // if a new pipeline is created, initialize its state machine
- updatePipelineState(pipeline,HddsProtos.LifeCycleEvent.CREATE);
-
- //TODO: move the initialization of pipeline to Ozone Client
- manager.initializePipeline(pipeline);
- updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATED);
- }
- return pipeline;
+ return manager.
+ getPipeline(replicationFactor, replicationType);
}
/**
@@ -304,6 +194,19 @@ public class PipelineSelector {
" pipelineName:{}", replicationType, pipelineName);
return manager.getPipeline(pipelineName);
}
+ /**
+ * Creates a pipeline from a specified set of Nodes.
+ */
+
+ public void createPipeline(ReplicationType replicationType, String
+ pipelineID, List<DatanodeDetails> datanodes) throws IOException {
+ PipelineManager manager = getPipelineManager(replicationType);
+ Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
+ LOG.debug("Creating a pipeline: {} with nodes:{}", pipelineID,
+ datanodes.stream().map(DatanodeDetails::toString)
+ .collect(Collectors.joining(",")));
+ manager.createPipeline(pipelineID, datanodes);
+ }
/**
* Close the pipeline with the given clusterId.
@@ -348,77 +251,12 @@ public class PipelineSelector {
}
public void removePipeline(UUID dnId) {
- Set<Pipeline> pipelineSet =
+ Set<Pipeline> pipelineChannelSet =
node2PipelineMap.getPipelines(dnId);
- for (Pipeline pipeline : pipelineSet) {
- getPipelineManager(pipeline.getType())
- .removePipeline(pipeline);
+ for (Pipeline pipelineChannel : pipelineChannelSet) {
+ getPipelineManager(pipelineChannel.getType())
+ .removePipeline(pipelineChannel);
}
node2PipelineMap.removeDatanode(dnId);
}
-
- /**
- * Update the Pipeline State to the next state.
- *
- * @param pipeline - Pipeline
- * @param event - LifeCycle Event
- * @throws SCMException on Failure.
- */
- public void updatePipelineState(Pipeline pipeline,
- HddsProtos.LifeCycleEvent event) throws IOException {
- HddsProtos.LifeCycleState newState;
- try {
- newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event);
- } catch (InvalidStateTransitionException ex) {
- String error = String.format("Failed to update pipeline state %s, " +
- "reason: invalid state transition from state: %s upon " +
- "event: %s.",
- pipeline.getPipelineName(), pipeline.getLifeCycleState(), event);
- LOG.error(error);
- throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
- }
-
- // This is a post condition after executing getNextState.
- Preconditions.checkNotNull(newState);
- Preconditions.checkNotNull(pipeline);
- try {
- switch (event) {
- case CREATE:
- // Acquire lease on pipeline
- Lease<Pipeline> pipelineLease = pipelineLeaseManager.acquire(pipeline);
- // Register callback to be executed in case of timeout
- pipelineLease.registerCallBack(() -> {
- updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.TIMEOUT);
- return null;
- });
- break;
- case CREATED:
- // Release the lease on pipeline
- pipelineLeaseManager.release(pipeline);
- break;
-
- case FINALIZE:
- //TODO: cleanup pipeline by closing all the containers on the pipeline
- break;
-
- case CLOSE:
- case TIMEOUT:
- // TODO: Release the nodes here when pipelines are destroyed
- break;
- default:
- throw new SCMException("Unsupported pipeline LifeCycleEvent.",
- FAILED_TO_CHANGE_PIPELINE_STATE);
- }
-
- pipeline.setLifeCycleState(newState);
- } catch (LeaseException e) {
- throw new IOException("Lease Exception.", e);
- }
- }
-
- public void shutdown() {
- if (pipelineLeaseManager != null) {
- pipelineLeaseManager.shutdown();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
index c726ef6..a8f8b20 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@@ -71,7 +72,7 @@ public class RatisManagerImpl extends PipelineManager {
* Allocates a new ratis Pipeline from the free nodes.
*
* @param factor - One or Three
- * @return Pipeline.
+ * @return PipelineChannel.
*/
public Pipeline allocatePipeline(ReplicationFactor factor) {
List<DatanodeDetails> newNodesList = new LinkedList<>();
@@ -88,23 +89,35 @@ public class RatisManagerImpl extends PipelineManager {
// further allocations
ratisMembers.addAll(newNodesList);
LOG.info("Allocating a new ratis pipeline of size: {}", count);
- // Start all pipeline names with "Ratis", easy to grep the logs.
+ // Start all channel names with "Ratis", easy to grep the logs.
String pipelineName = PREFIX +
UUID.randomUUID().toString().substring(PREFIX.length());
- return PipelineSelector.newPipelineFromNodes(newNodesList,
- ReplicationType.RATIS, factor, pipelineName);
+ Pipeline pipeline=
+ PipelineSelector.newPipelineFromNodes(newNodesList,
+ LifeCycleState.OPEN, ReplicationType.RATIS, factor, pipelineName);
+ try (XceiverClientRatis client =
+ XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
+ client.createPipeline(pipeline.getPipelineName(), newNodesList);
+ } catch (IOException e) {
+ return null;
+ }
+ return pipeline;
}
}
}
return null;
}
- public void initializePipeline(Pipeline pipeline) throws IOException {
- //TODO:move the initialization from SCM to client
- try (XceiverClientRatis client =
- XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
- client.createPipeline(pipeline.getPipelineName(), pipeline.getMachines());
- }
+ /**
+ * Creates a pipeline from a specified set of Nodes.
+ *
+ * @param pipelineID - Name of the pipeline
+ * @param datanodes - The list of datanodes that make this pipeline.
+ */
+ @Override
+ public void createPipeline(String pipelineID,
+ List<DatanodeDetails> datanodes) {
+
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
index bb4951f..cf691bf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@@ -85,19 +86,29 @@ public class StandaloneManagerImpl extends PipelineManager {
// once a datanode has been added to a pipeline, exclude it from
// further allocations
standAloneMembers.addAll(newNodesList);
- LOG.info("Allocating a new standalone pipeline of size: {}", count);
- String pipelineName =
+ LOG.info("Allocating a new standalone pipeline channel of size: {}",
+ count);
+ String channelName =
"SA-" + UUID.randomUUID().toString().substring(3);
return PipelineSelector.newPipelineFromNodes(newNodesList,
- ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
+ LifeCycleState.OPEN, ReplicationType.STAND_ALONE,
+ ReplicationFactor.ONE, channelName);
}
}
}
return null;
}
- public void initializePipeline(Pipeline pipeline) {
- // Nothing to be done for standalone pipeline
+ /**
+ * Creates a pipeline from a specified set of Nodes.
+ *
+ * @param pipelineID - Name of the pipeline
+ * @param datanodes - The list of datanodes that make this pipeline.
+ */
+ @Override
+ public void createPipeline(String pipelineID,
+ List<DatanodeDetails> datanodes) {
+ //return newPipelineFromNodes(datanodes, pipelineID);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
index ffac6d5..bc3505f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
@@ -26,8 +26,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers
.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.junit.AfterClass;
@@ -53,7 +51,6 @@ public class TestNode2PipelineMap {
private static ContainerWithPipeline ratisContainer;
private static ContainerStateMap stateMap;
private static ContainerMapping mapping;
- private static PipelineSelector pipelineSelector;
/**
* Create a MiniDFSCluster for testing.
@@ -69,7 +66,6 @@ public class TestNode2PipelineMap {
mapping = (ContainerMapping)scm.getScmContainerManager();
stateMap = mapping.getStateManager().getContainerStateMap();
ratisContainer = mapping.allocateContainer(RATIS, THREE, "testOwner");
- pipelineSelector = mapping.getPipelineSelector();
}
/**
@@ -117,15 +113,5 @@ public class TestNode2PipelineMap {
NavigableSet<ContainerID> set2 = stateMap.getOpenContainerIDsByPipeline(
ratisContainer.getPipeline().getPipelineName());
Assert.assertEquals(0, set2.size());
-
- try {
- pipelineSelector.updatePipelineState(ratisContainer.getPipeline(),
- HddsProtos.LifeCycleEvent.CLOSE);
- Assert.fail("closing of pipeline without finalize should fail");
- } catch (Exception e) {
- Assert.assertTrue(e instanceof SCMException);
- Assert.assertEquals(((SCMException)e).getResult(),
- SCMException.ResultCodes.FAILED_TO_CHANGE_PIPELINE_STATE);
- }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[3/3] hadoop git commit: HDDS-260. Support in Datanode for sending
ContainerActions to SCM. Contributed by Nanda kumar.
Posted by xy...@apache.org.
HDDS-260. Support in Datanode for sending ContainerActions to SCM. Contributed by Nanda kumar.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/347c9550
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/347c9550
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/347c9550
Branch: refs/heads/trunk
Commit: 347c9550135ea10fd84d5007124452bf5f2d6619
Parents: 9be25e3
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Fri Jul 20 14:37:13 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Fri Jul 20 14:37:13 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdds/HddsConfigKeys.java | 6 +
.../common/src/main/resources/ozone-default.xml | 10 +
.../common/statemachine/StateContext.java | 55 +++-
.../states/endpoint/HeartbeatEndpointTask.java | 33 +-
.../StorageContainerDatanodeProtocol.proto | 4 +-
.../common/report/TestReportPublisher.java | 41 ---
.../endpoint/TestHeartbeatEndpointTask.java | 300 +++++++++++++++++++
.../common/states/endpoint/package-info.java | 18 ++
8 files changed, 414 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 0283615..fd4bf08 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -48,4 +48,10 @@ public final class HddsConfigKeys {
"hdds.command.status.report.interval";
public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT =
"60s";
+
+ public static final String HDDS_CONTAINER_ACTION_MAX_LIMIT =
+ "hdds.container.action.max.limit";
+ public static final int HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT =
+ 20;
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 69a382a..84a3e0c 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1098,4 +1098,14 @@
</description>
</property>
+ <property>
+ <name>hdds.container.action.max.limit</name>
+ <value>20</value>
+ <tag>DATANODE</tag>
+ <description>
+ Maximum number of Container Actions sent by the datanode to SCM in a
+ single heartbeat.
+ </description>
+ </property>
+
</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index faaff69..7862cc6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -20,14 +20,18 @@ import com.google.protobuf.GeneratedMessage;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerAction;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.InitDatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.RunningDatanodeState;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
-import org.apache.hadoop.ozone.protocol.commands.CommandStatus.CommandStatusBuilder;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus
+ .CommandStatusBuilder;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +47,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
@@ -59,6 +64,7 @@ public class StateContext {
private final AtomicLong stateExecutionCount;
private final Configuration conf;
private final Queue<GeneratedMessage> reports;
+ private final Queue<ContainerAction> containerActions;
private DatanodeStateMachine.DatanodeStates state;
/**
@@ -76,6 +82,7 @@ public class StateContext {
commandQueue = new LinkedList<>();
cmdStatusMap = new ConcurrentHashMap<>();
reports = new LinkedList<>();
+ containerActions = new LinkedList<>();
lock = new ReentrantLock();
stateExecutionCount = new AtomicLong(0);
}
@@ -187,15 +194,45 @@ public class StateContext {
* @return List<reports>
*/
public List<GeneratedMessage> getReports(int maxLimit) {
- List<GeneratedMessage> results = new ArrayList<>();
synchronized (reports) {
- GeneratedMessage report = reports.poll();
- while(results.size() < maxLimit && report != null) {
- results.add(report);
- report = reports.poll();
- }
+ return reports.parallelStream().limit(maxLimit)
+ .collect(Collectors.toList());
+ }
+ }
+
+
+ /**
+ * Adds the ContainerAction to ContainerAction queue.
+ *
+ * @param containerAction ContainerAction to be added
+ */
+ public void addContainerAction(ContainerAction containerAction) {
+ synchronized (containerActions) {
+ containerActions.add(containerAction);
+ }
+ }
+
+ /**
+ * Returns all the pending ContainerActions from the ContainerAction queue,
+ * or empty list if the queue is empty.
+ *
+ * @return List<ContainerAction>
+ */
+ public List<ContainerAction> getAllPendingContainerActions() {
+ return getPendingContainerAction(Integer.MAX_VALUE);
+ }
+
+ /**
+ * Returns pending ContainerActions from the ContainerAction queue with a
+ * max limit on list size, or empty list if the queue is empty.
+ *
+ * @return List<ContainerAction>
+ */
+ public List<ContainerAction> getPendingContainerAction(int maxLimit) {
+ synchronized (containerActions) {
+ return containerActions.parallelStream().limit(maxLimit)
+ .collect(Collectors.toList());
}
- return results;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 260a245..020fb71 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -25,6 +25,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerAction;
+import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
@@ -46,8 +50,14 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.ZonedDateTime;
+import java.util.List;
import java.util.concurrent.Callable;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+ .HDDS_CONTAINER_ACTION_MAX_LIMIT;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+ .HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT;
+
/**
* Heartbeat class for SCMs.
*/
@@ -59,6 +69,7 @@ public class HeartbeatEndpointTask
private final Configuration conf;
private DatanodeDetailsProto datanodeDetailsProto;
private StateContext context;
+ private int maxContainerActionsPerHB;
/**
* Constructs a SCM heart beat.
@@ -70,6 +81,8 @@ public class HeartbeatEndpointTask
this.rpcEndpoint = rpcEndpoint;
this.conf = conf;
this.context = context;
+ this.maxContainerActionsPerHB = conf.getInt(HDDS_CONTAINER_ACTION_MAX_LIMIT,
+ HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT);
}
/**
@@ -107,7 +120,7 @@ public class HeartbeatEndpointTask
SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetailsProto);
addReports(requestBuilder);
-
+ addContainerActions(requestBuilder);
SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
.sendHeartbeat(requestBuilder.build());
processResponse(reponse, datanodeDetailsProto);
@@ -140,6 +153,24 @@ public class HeartbeatEndpointTask
}
/**
+ * Adds all the pending ContainerActions to the heartbeat.
+ *
+ * @param requestBuilder builder to which the report has to be added.
+ */
+ private void addContainerActions(
+ SCMHeartbeatRequestProto.Builder requestBuilder) {
+ List<ContainerAction> actions = context.getPendingContainerAction(
+ maxContainerActionsPerHB);
+ if (!actions.isEmpty()) {
+ ContainerActionsProto cap = ContainerActionsProto.newBuilder()
+ .addAllContainerActions(actions)
+ .build();
+ requestBuilder.setContainerActions(cap);
+ }
+ }
+
+
+ /**
* Returns a builder class for HeartbeatEndpointTask task.
* @return Builder.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 4238389..d89567b 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -79,8 +79,8 @@ message SCMHeartbeatRequestProto {
required DatanodeDetailsProto datanodeDetails = 1;
optional NodeReportProto nodeReport = 2;
optional ContainerReportsProto containerReport = 3;
- optional ContainerActionsProto containerActions = 4;
- optional CommandStatusReportsProto commandStatusReport = 5;
+ optional CommandStatusReportsProto commandStatusReport = 4;
+ optional ContainerActionsProto containerActions = 5;
}
/*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index a0db2e8..811599f 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.container.common.report;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.Descriptors;
import com.google.protobuf.GeneratedMessage;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -28,14 +27,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
@@ -178,22 +171,6 @@ public class TestReportPublisher {
executorService.shutdown();
}
- @Test
- public void testAddingReportToHeartbeat() {
- GeneratedMessage nodeReport = NodeReportProto.getDefaultInstance();
- GeneratedMessage containerReport = ContainerReportsProto
- .getDefaultInstance();
- SCMHeartbeatRequestProto.Builder heartbeatBuilder =
- SCMHeartbeatRequestProto.newBuilder();
- heartbeatBuilder.setDatanodeDetails(
- getDatanodeDetails().getProtoBufMessage());
- addReport(heartbeatBuilder, nodeReport);
- addReport(heartbeatBuilder, containerReport);
- SCMHeartbeatRequestProto heartbeat = heartbeatBuilder.build();
- Assert.assertTrue(heartbeat.hasNodeReport());
- Assert.assertTrue(heartbeat.hasContainerReport());
- }
-
/**
* Get a datanode details.
*
@@ -222,22 +199,4 @@ public class TestReportPublisher {
return builder.build();
}
- /**
- * Adds the report to heartbeat.
- *
- * @param requestBuilder builder to which the report has to be added.
- * @param report the report to be added.
- */
- private static void addReport(SCMHeartbeatRequestProto.Builder
- requestBuilder, GeneratedMessage report) {
- String reportName = report.getDescriptorForType().getFullName();
- for (Descriptors.FieldDescriptor descriptor :
- SCMHeartbeatRequestProto.getDescriptor().getFields()) {
- String heartbeatFieldName = descriptor.getMessageType().getFullName();
- if (heartbeatFieldName.equals(reportName)) {
- requestBuilder.setField(descriptor, report);
- }
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
new file mode 100644
index 0000000..b4d718d
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.states.endpoint;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerInfo;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerAction;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .DatanodeStateMachine.DatanodeStates;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.protocolPB
+ .StorageContainerDatanodeProtocolClientSideTranslatorPB;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.util.UUID;
+
+/**
+ * This class tests the functionality of HeartbeatEndpointTask.
+ */
+public class TestHeartbeatEndpointTask {
+
+
+ @Test
+ public void testheartbeatWithoutReports() throws Exception {
+ StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+ Mockito.mock(
+ StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+ ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
+ .forClass(SCMHeartbeatRequestProto.class);
+ Mockito.when(scm.sendHeartbeat(argument.capture()))
+ .thenAnswer(invocation ->
+ SCMHeartbeatResponseProto.newBuilder()
+ .setDatanodeUUID(
+ ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+ .getDatanodeDetails().getUuid())
+ .build());
+
+ HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(scm);
+ endpointTask.call();
+ SCMHeartbeatRequestProto heartbeat = argument.getValue();
+ Assert.assertTrue(heartbeat.hasDatanodeDetails());
+ Assert.assertFalse(heartbeat.hasNodeReport());
+ Assert.assertFalse(heartbeat.hasContainerReport());
+ Assert.assertFalse(heartbeat.hasCommandStatusReport());
+ Assert.assertFalse(heartbeat.hasContainerActions());
+ }
+
+ @Test
+ public void testheartbeatWithNodeReports() throws Exception {
+ Configuration conf = new OzoneConfiguration();
+ StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+ Mockito.mock(DatanodeStateMachine.class));
+
+ StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+ Mockito.mock(
+ StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+ ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
+ .forClass(SCMHeartbeatRequestProto.class);
+ Mockito.when(scm.sendHeartbeat(argument.capture()))
+ .thenAnswer(invocation ->
+ SCMHeartbeatResponseProto.newBuilder()
+ .setDatanodeUUID(
+ ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+ .getDatanodeDetails().getUuid())
+ .build());
+
+ HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
+ conf, context, scm);
+ context.addReport(NodeReportProto.getDefaultInstance());
+ endpointTask.call();
+ SCMHeartbeatRequestProto heartbeat = argument.getValue();
+ Assert.assertTrue(heartbeat.hasDatanodeDetails());
+ Assert.assertTrue(heartbeat.hasNodeReport());
+ Assert.assertFalse(heartbeat.hasContainerReport());
+ Assert.assertFalse(heartbeat.hasCommandStatusReport());
+ Assert.assertFalse(heartbeat.hasContainerActions());
+ }
+
+ @Test
+ public void testheartbeatWithContainerReports() throws Exception {
+ Configuration conf = new OzoneConfiguration();
+ StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+ Mockito.mock(DatanodeStateMachine.class));
+
+ StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+ Mockito.mock(
+ StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+ ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
+ .forClass(SCMHeartbeatRequestProto.class);
+ Mockito.when(scm.sendHeartbeat(argument.capture()))
+ .thenAnswer(invocation ->
+ SCMHeartbeatResponseProto.newBuilder()
+ .setDatanodeUUID(
+ ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+ .getDatanodeDetails().getUuid())
+ .build());
+
+ HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
+ conf, context, scm);
+ context.addReport(ContainerReportsProto.getDefaultInstance());
+ endpointTask.call();
+ SCMHeartbeatRequestProto heartbeat = argument.getValue();
+ Assert.assertTrue(heartbeat.hasDatanodeDetails());
+ Assert.assertFalse(heartbeat.hasNodeReport());
+ Assert.assertTrue(heartbeat.hasContainerReport());
+ Assert.assertFalse(heartbeat.hasCommandStatusReport());
+ Assert.assertFalse(heartbeat.hasContainerActions());
+ }
+
+ @Test
+ public void testheartbeatWithCommandStatusReports() throws Exception {
+ Configuration conf = new OzoneConfiguration();
+ StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+ Mockito.mock(DatanodeStateMachine.class));
+
+ StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+ Mockito.mock(
+ StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+ ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
+ .forClass(SCMHeartbeatRequestProto.class);
+ Mockito.when(scm.sendHeartbeat(argument.capture()))
+ .thenAnswer(invocation ->
+ SCMHeartbeatResponseProto.newBuilder()
+ .setDatanodeUUID(
+ ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+ .getDatanodeDetails().getUuid())
+ .build());
+
+ HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
+ conf, context, scm);
+ context.addReport(CommandStatusReportsProto.getDefaultInstance());
+ endpointTask.call();
+ SCMHeartbeatRequestProto heartbeat = argument.getValue();
+ Assert.assertTrue(heartbeat.hasDatanodeDetails());
+ Assert.assertFalse(heartbeat.hasNodeReport());
+ Assert.assertFalse(heartbeat.hasContainerReport());
+ Assert.assertTrue(heartbeat.hasCommandStatusReport());
+ Assert.assertFalse(heartbeat.hasContainerActions());
+ }
+
+ @Test
+ public void testheartbeatWithContainerActions() throws Exception {
+ Configuration conf = new OzoneConfiguration();
+ StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+ Mockito.mock(DatanodeStateMachine.class));
+
+ StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+ Mockito.mock(
+ StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+ ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
+ .forClass(SCMHeartbeatRequestProto.class);
+ Mockito.when(scm.sendHeartbeat(argument.capture()))
+ .thenAnswer(invocation ->
+ SCMHeartbeatResponseProto.newBuilder()
+ .setDatanodeUUID(
+ ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+ .getDatanodeDetails().getUuid())
+ .build());
+
+ HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
+ conf, context, scm);
+ context.addContainerAction(getContainerAction());
+ endpointTask.call();
+ SCMHeartbeatRequestProto heartbeat = argument.getValue();
+ Assert.assertTrue(heartbeat.hasDatanodeDetails());
+ Assert.assertFalse(heartbeat.hasNodeReport());
+ Assert.assertFalse(heartbeat.hasContainerReport());
+ Assert.assertFalse(heartbeat.hasCommandStatusReport());
+ Assert.assertTrue(heartbeat.hasContainerActions());
+ }
+
+ @Test
+ public void testheartbeatWithAllReports() throws Exception {
+ Configuration conf = new OzoneConfiguration();
+ StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+ Mockito.mock(DatanodeStateMachine.class));
+
+ StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+ Mockito.mock(
+ StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+ ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
+ .forClass(SCMHeartbeatRequestProto.class);
+ Mockito.when(scm.sendHeartbeat(argument.capture()))
+ .thenAnswer(invocation ->
+ SCMHeartbeatResponseProto.newBuilder()
+ .setDatanodeUUID(
+ ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+ .getDatanodeDetails().getUuid())
+ .build());
+
+ HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
+ conf, context, scm);
+ context.addReport(NodeReportProto.getDefaultInstance());
+ context.addReport(ContainerReportsProto.getDefaultInstance());
+ context.addReport(CommandStatusReportsProto.getDefaultInstance());
+ context.addContainerAction(getContainerAction());
+ endpointTask.call();
+ SCMHeartbeatRequestProto heartbeat = argument.getValue();
+ Assert.assertTrue(heartbeat.hasDatanodeDetails());
+ Assert.assertTrue(heartbeat.hasNodeReport());
+ Assert.assertTrue(heartbeat.hasContainerReport());
+ Assert.assertTrue(heartbeat.hasCommandStatusReport());
+ Assert.assertTrue(heartbeat.hasContainerActions());
+ }
+
+ /**
+ * Creates HeartbeatEndpointTask for the given StorageContainerManager proxy.
+ *
+ * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB
+ *
+ * @return HeartbeatEndpointTask
+ */
+ private HeartbeatEndpointTask getHeartbeatEndpointTask(
+ StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) {
+ Configuration conf = new OzoneConfiguration();
+ StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+ Mockito.mock(DatanodeStateMachine.class));
+ return getHeartbeatEndpointTask(conf, context, proxy);
+
+ }
+
+ /**
+ * Creates HeartbeatEndpointTask with the given conf, context and
+ * StorageContainerManager client side proxy.
+ *
+ * @param conf Configuration
+ * @param context StateContext
+ * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB
+ *
+ * @return HeartbeatEndpointTask
+ */
+ private HeartbeatEndpointTask getHeartbeatEndpointTask(
+ Configuration conf,
+ StateContext context,
+ StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) {
+ DatanodeDetails datanodeDetails = DatanodeDetails.newBuilder()
+ .setUuid(UUID.randomUUID().toString())
+ .setHostName("localhost")
+ .setIpAddress("127.0.0.1")
+ .build();
+ EndpointStateMachine endpointStateMachine = Mockito
+ .mock(EndpointStateMachine.class);
+ Mockito.when(endpointStateMachine.getEndPoint()).thenReturn(proxy);
+ return HeartbeatEndpointTask.newBuilder()
+ .setConfig(conf)
+ .setDatanodeDetails(datanodeDetails)
+ .setContext(context)
+ .setEndpointStateMachine(endpointStateMachine)
+ .build();
+ }
+
+ private ContainerAction getContainerAction() {
+ ContainerAction.Builder builder = ContainerAction.newBuilder();
+ ContainerInfo containerInfo = ContainerInfo.newBuilder()
+ .setContainerID(1L)
+ .build();
+ builder.setContainer(containerInfo)
+ .setAction(ContainerAction.Action.CLOSE)
+ .setReason(ContainerAction.Reason.CONTAINER_FULL);
+ return builder.build();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
new file mode 100644
index 0000000..d120a5c
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.endpoint;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/3] hadoop git commit: HDDS-239. Add PipelineStateManager to track
pipeline state transition. Contributed by Mukul Kumar Singh.
Posted by xy...@apache.org.
HDDS-239. Add PipelineStateManager to track pipeline state transition. Contributed by Mukul Kumar Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9be25e34
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9be25e34
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9be25e34
Branch: refs/heads/trunk
Commit: 9be25e347683d26e0575458c7f470c76fd4d951b
Parents: d2acf8d
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Fri Jul 20 14:22:02 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Fri Jul 20 14:22:02 2018 -0700
----------------------------------------------------------------------
.../apache/hadoop/hdds/scm/ScmConfigKeys.java | 5 +
.../scm/container/common/helpers/Pipeline.java | 7 +
.../common/src/main/resources/ozone-default.xml | 12 ++
.../hdds/scm/container/ContainerMapping.java | 4 +
.../hdds/scm/exceptions/SCMException.java | 1 +
.../hdds/scm/pipelines/PipelineManager.java | 64 +++---
.../hdds/scm/pipelines/PipelineSelector.java | 212 ++++++++++++++++---
.../scm/pipelines/ratis/RatisManagerImpl.java | 33 +--
.../standalone/StandaloneManagerImpl.java | 21 +-
.../hdds/scm/pipeline/TestNode2PipelineMap.java | 14 ++
10 files changed, 273 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 71184cf..6e940ad 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -236,6 +236,11 @@ public final class ScmConfigKeys {
public static final String
OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT = "60s";
+ public static final String OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT =
+ "ozone.scm.pipeline.creation.lease.timeout";
+
+ public static final String
+ OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT = "60s";
public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY =
"ozone.scm.block.deletion.max.retry";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
index c5794f4..534c9fd 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
@@ -214,6 +214,13 @@ public class Pipeline {
}
/**
+ * Update the State of the pipeline.
+ */
+ public void setLifeCycleState(HddsProtos.LifeCycleState nextState) {
+ lifeCycleState = nextState;
+ }
+
+ /**
* Gets the pipeline Name.
*
* @return - Name of the pipeline
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 5a1d26a..69a382a 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1085,5 +1085,17 @@
executed since last report. Unit could be defined with
postfix (ns,ms,s,m,h,d)</description>
</property>
+ <property>
+ <name>ozone.scm.pipeline.creation.lease.timeout</name>
+ <value>60s</value>
+ <tag>OZONE, SCM, PIPELINE</tag>
+ <description>
+ Pipeline creation timeout in milliseconds to be used by SCM. When
+ BEGIN_CREATE event happens the pipeline is moved from ALLOCATED to
+ CREATING state, SCM will now wait for the configured amount of time
+ to get COMPLETE_CREATE event if it doesn't receive it will move the
+ pipeline to DELETING.
+ </description>
+ </property>
</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index 26f4d86..f07d22b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -658,6 +658,10 @@ public class ContainerMapping implements Mapping {
if (containerStore != null) {
containerStore.close();
}
+
+ if (pipelineSelector != null) {
+ pipelineSelector.shutdown();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
index d7d70ef..0085542 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
@@ -107,6 +107,7 @@ public class SCMException extends IOException {
FAILED_TO_LOAD_OPEN_CONTAINER,
FAILED_TO_ALLOCATE_CONTAINER,
FAILED_TO_CHANGE_CONTAINER_STATE,
+ FAILED_TO_CHANGE_PIPELINE_STATE,
CONTAINER_EXISTS,
FAILED_TO_FIND_CONTAINER,
FAILED_TO_FIND_CONTAINER_WITH_SPACE,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
index a041973..77d8211 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
@@ -59,41 +59,16 @@ public abstract class PipelineManager {
* @return a Pipeline.
*/
public synchronized final Pipeline getPipeline(
- ReplicationFactor replicationFactor, ReplicationType replicationType)
- throws IOException {
- /**
- * In the Ozone world, we have a very simple policy.
- *
- * 1. Try to create a pipeline if there are enough free nodes.
- *
- * 2. This allows all nodes to part of a pipeline quickly.
- *
- * 3. if there are not enough free nodes, return pipeline in a
- * round-robin fashion.
- *
- * TODO: Might have to come up with a better algorithm than this.
- * Create a new placement policy that returns pipelines in round robin
- * fashion.
- */
- Pipeline pipeline = allocatePipeline(replicationFactor);
+ ReplicationFactor replicationFactor, ReplicationType replicationType) {
+ Pipeline pipeline = findOpenPipeline(replicationType, replicationFactor);
if (pipeline != null) {
- LOG.debug("created new pipeline:{} for container with " +
+ LOG.debug("re-used pipeline:{} for container with " +
"replicationType:{} replicationFactor:{}",
pipeline.getPipelineName(), replicationType, replicationFactor);
- activePipelines.add(pipeline);
- activePipelineMap.put(pipeline.getPipelineName(), pipeline);
- node2PipelineMap.addPipeline(pipeline);
- } else {
- pipeline = findOpenPipeline(replicationType, replicationFactor);
- if (pipeline != null) {
- LOG.debug("re-used pipeline:{} for container with " +
- "replicationType:{} replicationFactor:{}",
- pipeline.getPipelineName(), replicationType, replicationFactor);
- }
}
if (pipeline == null) {
LOG.error("Get pipeline call failed. We are not able to find" +
- "free nodes or operational pipeline.");
+ " operational pipeline.");
return null;
} else {
return pipeline;
@@ -109,7 +84,7 @@ public abstract class PipelineManager {
public synchronized final Pipeline getPipeline(String pipelineName) {
Pipeline pipeline = null;
- // 1. Check if pipeline channel already exists
+ // 1. Check if pipeline already exists
if (activePipelineMap.containsKey(pipelineName)) {
pipeline = activePipelineMap.get(pipelineName);
LOG.debug("Returning pipeline for pipelineName:{}", pipelineName);
@@ -132,7 +107,13 @@ public abstract class PipelineManager {
}
public abstract Pipeline allocatePipeline(
- ReplicationFactor replicationFactor) throws IOException;
+ ReplicationFactor replicationFactor);
+
+ /**
+ * Initialize the pipeline
+ * TODO: move the initialization to Ozone Client later
+ */
+ public abstract void initializePipeline(Pipeline pipeline) throws IOException;
public void removePipeline(Pipeline pipeline) {
activePipelines.remove(pipeline);
@@ -179,12 +160,23 @@ public abstract class PipelineManager {
}
/**
- * Creates a pipeline from a specified set of Nodes.
- * @param pipelineID - Name of the pipeline
- * @param datanodes - The list of datanodes that make this pipeline.
+ * Creates a pipeline with a specified replication factor and type.
+ * @param replicationFactor - Replication Factor.
+ * @param replicationType - Replication Type.
*/
- public abstract void createPipeline(String pipelineID,
- List<DatanodeDetails> datanodes) throws IOException;
+ public Pipeline createPipeline(ReplicationFactor replicationFactor,
+ ReplicationType replicationType) throws IOException {
+ Pipeline pipeline = allocatePipeline(replicationFactor);
+ if (pipeline != null) {
+ LOG.debug("created new pipeline:{} for container with "
+ + "replicationType:{} replicationFactor:{}",
+ pipeline.getPipelineName(), replicationType, replicationFactor);
+ activePipelines.add(pipeline);
+ activePipelineMap.put(pipeline.getPipelineName(), pipeline);
+ node2PipelineMap.addPipeline(pipeline);
+ }
+ return pipeline;
+ }
/**
* Close the pipeline with the given clusterId.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
index 2955af5..08710e7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl;
import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl;
@@ -33,17 +34,28 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.statemachine
+ .InvalidStateTransitionException;
+import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+import org.apache.hadoop.ozone.lease.Lease;
+import org.apache.hadoop.ozone.lease.LeaseException;
+import org.apache.hadoop.ozone.lease.LeaseManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+ .FAILED_TO_CHANGE_PIPELINE_STATE;
+
/**
* Sends the request to the right pipeline manager.
*/
@@ -57,6 +69,10 @@ public class PipelineSelector {
private final StandaloneManagerImpl standaloneManager;
private final long containerSize;
private final Node2PipelineMap node2PipelineMap;
+ private final LeaseManager<Pipeline> pipelineLeaseManager;
+ private final StateMachine<LifeCycleState,
+ HddsProtos.LifeCycleEvent> stateMachine;
+
/**
* Constructs a pipeline Selector.
*
@@ -77,6 +93,74 @@ public class PipelineSelector {
this.ratisManager =
new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
conf, node2PipelineMap);
+ // Initialize the container state machine.
+ Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
+ long pipelineCreationLeaseTimeout = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
+ ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ LOG.trace("Starting Pipeline Lease Manager.");
+ pipelineLeaseManager = new LeaseManager<>(pipelineCreationLeaseTimeout);
+ pipelineLeaseManager.start();
+
+ // These are the steady states of a container.
+ finalStates.add(HddsProtos.LifeCycleState.OPEN);
+ finalStates.add(HddsProtos.LifeCycleState.CLOSED);
+
+ this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED,
+ finalStates);
+ initializeStateMachine();
+ }
+
+ /**
+ * Event and State Transition Mapping:
+ *
+ * State: ALLOCATED ---------------> CREATING
+ * Event: CREATE
+ *
+ * State: CREATING ---------------> OPEN
+ * Event: CREATED
+ *
+ * State: OPEN ---------------> CLOSING
+ * Event: FINALIZE
+ *
+ * State: CLOSING ---------------> CLOSED
+ * Event: CLOSE
+ *
+ * State: CREATING ---------------> CLOSED
+ * Event: TIMEOUT
+ *
+ *
+ * Container State Flow:
+ *
+ * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
+ * (CREATE) | (CREATED) (FINALIZE) |
+ * | |
+ * | |
+ * |(TIMEOUT) |(CLOSE)
+ * | |
+ * +--------> [CLOSED] <--------+
+ */
+ private void initializeStateMachine() {
+ stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED,
+ HddsProtos.LifeCycleState.CREATING,
+ HddsProtos.LifeCycleEvent.CREATE);
+
+ stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
+ HddsProtos.LifeCycleState.OPEN,
+ HddsProtos.LifeCycleEvent.CREATED);
+
+ stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN,
+ HddsProtos.LifeCycleState.CLOSING,
+ HddsProtos.LifeCycleEvent.FINALIZE);
+
+ stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING,
+ HddsProtos.LifeCycleState.CLOSED,
+ HddsProtos.LifeCycleEvent.CLOSE);
+
+ stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
+ HddsProtos.LifeCycleState.CLOSED,
+ HddsProtos.LifeCycleEvent.TIMEOUT);
}
/**
@@ -88,15 +172,14 @@ public class PipelineSelector {
* @return pipeline corresponding to nodes
*/
public static Pipeline newPipelineFromNodes(
- List<DatanodeDetails> nodes, LifeCycleState state,
- ReplicationType replicationType, ReplicationFactor replicationFactor,
- String name) {
+ List<DatanodeDetails> nodes, ReplicationType replicationType,
+ ReplicationFactor replicationFactor, String name) {
Preconditions.checkNotNull(nodes);
Preconditions.checkArgument(nodes.size() > 0);
String leaderId = nodes.get(0).getUuidString();
- Pipeline
- pipeline = new Pipeline(leaderId, state, replicationType,
- replicationFactor, name);
+ // A new pipeline always starts in allocated state
+ Pipeline pipeline = new Pipeline(leaderId, LifeCycleState.ALLOCATED,
+ replicationType, replicationFactor, name);
for (DatanodeDetails node : nodes) {
pipeline.addMember(node);
}
@@ -175,8 +258,35 @@ public class PipelineSelector {
LOG.debug("Getting replication pipeline forReplicationType {} :" +
" ReplicationFactor {}", replicationType.toString(),
replicationFactor.toString());
- return manager.
- getPipeline(replicationFactor, replicationType);
+
+ /**
+ * In the Ozone world, we have a very simple policy.
+ *
+ * 1. Try to create a pipeline if there are enough free nodes.
+ *
+ * 2. This allows all nodes to part of a pipeline quickly.
+ *
+ * 3. if there are not enough free nodes, return already allocated pipeline
+ * in a round-robin fashion.
+ *
+ * TODO: Might have to come up with a better algorithm than this.
+ * Create a new placement policy that returns pipelines in round robin
+ * fashion.
+ */
+ Pipeline pipeline =
+ manager.createPipeline(replicationFactor, replicationType);
+ if (pipeline == null) {
+ // try to return a pipeline from already allocated pipelines
+ pipeline = manager.getPipeline(replicationFactor, replicationType);
+ } else {
+ // if a new pipeline is created, initialize its state machine
+ updatePipelineState(pipeline,HddsProtos.LifeCycleEvent.CREATE);
+
+ //TODO: move the initialization of pipeline to Ozone Client
+ manager.initializePipeline(pipeline);
+ updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATED);
+ }
+ return pipeline;
}
/**
@@ -194,19 +304,6 @@ public class PipelineSelector {
" pipelineName:{}", replicationType, pipelineName);
return manager.getPipeline(pipelineName);
}
- /**
- * Creates a pipeline from a specified set of Nodes.
- */
-
- public void createPipeline(ReplicationType replicationType, String
- pipelineID, List<DatanodeDetails> datanodes) throws IOException {
- PipelineManager manager = getPipelineManager(replicationType);
- Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
- LOG.debug("Creating a pipeline: {} with nodes:{}", pipelineID,
- datanodes.stream().map(DatanodeDetails::toString)
- .collect(Collectors.joining(",")));
- manager.createPipeline(pipelineID, datanodes);
- }
/**
* Close the pipeline with the given clusterId.
@@ -251,12 +348,77 @@ public class PipelineSelector {
}
public void removePipeline(UUID dnId) {
- Set<Pipeline> pipelineChannelSet =
+ Set<Pipeline> pipelineSet =
node2PipelineMap.getPipelines(dnId);
- for (Pipeline pipelineChannel : pipelineChannelSet) {
- getPipelineManager(pipelineChannel.getType())
- .removePipeline(pipelineChannel);
+ for (Pipeline pipeline : pipelineSet) {
+ getPipelineManager(pipeline.getType())
+ .removePipeline(pipeline);
}
node2PipelineMap.removeDatanode(dnId);
}
+
+ /**
+ * Update the Pipeline State to the next state.
+ *
+ * @param pipeline - Pipeline
+ * @param event - LifeCycle Event
+ * @throws SCMException on Failure.
+ */
+ public void updatePipelineState(Pipeline pipeline,
+ HddsProtos.LifeCycleEvent event) throws IOException {
+ HddsProtos.LifeCycleState newState;
+ try {
+ newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event);
+ } catch (InvalidStateTransitionException ex) {
+ String error = String.format("Failed to update pipeline state %s, " +
+ "reason: invalid state transition from state: %s upon " +
+ "event: %s.",
+ pipeline.getPipelineName(), pipeline.getLifeCycleState(), event);
+ LOG.error(error);
+ throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
+ }
+
+ // This is a post condition after executing getNextState.
+ Preconditions.checkNotNull(newState);
+ Preconditions.checkNotNull(pipeline);
+ try {
+ switch (event) {
+ case CREATE:
+ // Acquire lease on pipeline
+ Lease<Pipeline> pipelineLease = pipelineLeaseManager.acquire(pipeline);
+ // Register callback to be executed in case of timeout
+ pipelineLease.registerCallBack(() -> {
+ updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.TIMEOUT);
+ return null;
+ });
+ break;
+ case CREATED:
+ // Release the lease on pipeline
+ pipelineLeaseManager.release(pipeline);
+ break;
+
+ case FINALIZE:
+ //TODO: cleanup pipeline by closing all the containers on the pipeline
+ break;
+
+ case CLOSE:
+ case TIMEOUT:
+ // TODO: Release the nodes here when pipelines are destroyed
+ break;
+ default:
+ throw new SCMException("Unsupported pipeline LifeCycleEvent.",
+ FAILED_TO_CHANGE_PIPELINE_STATE);
+ }
+
+ pipeline.setLifeCycleState(newState);
+ } catch (LeaseException e) {
+ throw new IOException("Lease Exception.", e);
+ }
+ }
+
+ public void shutdown() {
+ if (pipelineLeaseManager != null) {
+ pipelineLeaseManager.shutdown();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
index a8f8b20..c726ef6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@@ -72,7 +71,7 @@ public class RatisManagerImpl extends PipelineManager {
* Allocates a new ratis Pipeline from the free nodes.
*
* @param factor - One or Three
- * @return PipelineChannel.
+ * @return Pipeline.
*/
public Pipeline allocatePipeline(ReplicationFactor factor) {
List<DatanodeDetails> newNodesList = new LinkedList<>();
@@ -89,35 +88,23 @@ public class RatisManagerImpl extends PipelineManager {
// further allocations
ratisMembers.addAll(newNodesList);
LOG.info("Allocating a new ratis pipeline of size: {}", count);
- // Start all channel names with "Ratis", easy to grep the logs.
+ // Start all pipeline names with "Ratis", easy to grep the logs.
String pipelineName = PREFIX +
UUID.randomUUID().toString().substring(PREFIX.length());
- Pipeline pipeline=
- PipelineSelector.newPipelineFromNodes(newNodesList,
- LifeCycleState.OPEN, ReplicationType.RATIS, factor, pipelineName);
- try (XceiverClientRatis client =
- XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
- client.createPipeline(pipeline.getPipelineName(), newNodesList);
- } catch (IOException e) {
- return null;
- }
- return pipeline;
+ return PipelineSelector.newPipelineFromNodes(newNodesList,
+ ReplicationType.RATIS, factor, pipelineName);
}
}
}
return null;
}
- /**
- * Creates a pipeline from a specified set of Nodes.
- *
- * @param pipelineID - Name of the pipeline
- * @param datanodes - The list of datanodes that make this pipeline.
- */
- @Override
- public void createPipeline(String pipelineID,
- List<DatanodeDetails> datanodes) {
-
+ public void initializePipeline(Pipeline pipeline) throws IOException {
+ //TODO:move the initialization from SCM to client
+ try (XceiverClientRatis client =
+ XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
+ client.createPipeline(pipeline.getPipelineName(), pipeline.getMachines());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
index cf691bf..bb4951f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@@ -86,29 +85,19 @@ public class StandaloneManagerImpl extends PipelineManager {
// once a datanode has been added to a pipeline, exclude it from
// further allocations
standAloneMembers.addAll(newNodesList);
- LOG.info("Allocating a new standalone pipeline channel of size: {}",
- count);
- String channelName =
+ LOG.info("Allocating a new standalone pipeline of size: {}", count);
+ String pipelineName =
"SA-" + UUID.randomUUID().toString().substring(3);
return PipelineSelector.newPipelineFromNodes(newNodesList,
- LifeCycleState.OPEN, ReplicationType.STAND_ALONE,
- ReplicationFactor.ONE, channelName);
+ ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
}
}
}
return null;
}
- /**
- * Creates a pipeline from a specified set of Nodes.
- *
- * @param pipelineID - Name of the pipeline
- * @param datanodes - The list of datanodes that make this pipeline.
- */
- @Override
- public void createPipeline(String pipelineID,
- List<DatanodeDetails> datanodes) {
- //return newPipelineFromNodes(datanodes, pipelineID);
+ public void initializePipeline(Pipeline pipeline) {
+ // Nothing to be done for standalone pipeline
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
index bc3505f..ffac6d5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hdds.scm.container.common.helpers
.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.junit.AfterClass;
@@ -51,6 +53,7 @@ public class TestNode2PipelineMap {
private static ContainerWithPipeline ratisContainer;
private static ContainerStateMap stateMap;
private static ContainerMapping mapping;
+ private static PipelineSelector pipelineSelector;
/**
* Create a MiniDFSCluster for testing.
@@ -66,6 +69,7 @@ public class TestNode2PipelineMap {
mapping = (ContainerMapping)scm.getScmContainerManager();
stateMap = mapping.getStateManager().getContainerStateMap();
ratisContainer = mapping.allocateContainer(RATIS, THREE, "testOwner");
+ pipelineSelector = mapping.getPipelineSelector();
}
/**
@@ -113,5 +117,15 @@ public class TestNode2PipelineMap {
NavigableSet<ContainerID> set2 = stateMap.getOpenContainerIDsByPipeline(
ratisContainer.getPipeline().getPipelineName());
Assert.assertEquals(0, set2.size());
+
+ try {
+ pipelineSelector.updatePipelineState(ratisContainer.getPipeline(),
+ HddsProtos.LifeCycleEvent.CLOSE);
+ Assert.fail("closing of pipeline without finalize should fail");
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof SCMException);
+ Assert.assertEquals(((SCMException)e).getResult(),
+ SCMException.ResultCodes.FAILED_TO_CHANGE_PIPELINE_STATE);
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org