You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by li...@apache.org on 2020/06/03 03:34:31 UTC
[hadoop-ozone] branch HDDS-2823 updated: HDDS-3693 Switch to new
StateManager interface. (#1007)
This is an automated email from the ASF dual-hosted git repository.
licheng pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push:
new bc68c48 HDDS-3693 Switch to new StateManager interface. (#1007)
bc68c48 is described below
commit bc68c4843a84b9738c8f3e1bd3c47eaf47d4f12f
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Wed Jun 3 11:34:21 2020 +0800
HDDS-3693 Switch to new StateManager interface. (#1007)
Co-authored-by: Li Cheng <ti...@tencent.com>
---
.../hadoop/hdds/scm/pipeline/PipelineFactory.java | 2 +-
.../hdds/scm/pipeline/PipelineManagerV2Impl.java | 22 ++++--
.../hdds/scm/pipeline/PipelinePlacementPolicy.java | 5 +-
.../hadoop/hdds/scm/pipeline/PipelineProvider.java | 6 +-
.../hdds/scm/pipeline/PipelineStateManager.java | 80 ++++++++++++++++----
.../scm/pipeline/PipelineStateManagerV2Impl.java | 52 +++++++++++--
.../hdds/scm/pipeline/RatisPipelineProvider.java | 4 +-
.../hdds/scm/pipeline/RatisPipelineUtils.java | 2 +-
.../hdds/scm/pipeline/SCMPipelineManager.java | 4 +-
.../hdds/scm/pipeline/SimplePipelineProvider.java | 2 +-
...pelineStateManagerV2.java => StateManager.java} | 23 +++++-
.../scm/pipeline/MockRatisPipelineProvider.java | 20 ++---
.../hdds/scm/pipeline/TestPipelieManagerImpl.java | 87 ++++++++++++++++++++++
.../TestPipelineDatanodesIntersection.java | 2 +-
.../scm/pipeline/TestPipelineStateManager.java | 2 +-
.../ozone/recon/scm/ReconPipelineManager.java | 3 +-
16 files changed, 262 insertions(+), 54 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index e1cf382..bdd5053 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -39,7 +39,7 @@ public class PipelineFactory {
private Map<ReplicationType, PipelineProvider> providers;
- PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager,
+ PipelineFactory(NodeManager nodeManager, StateManager stateManager,
ConfigurationSource conf, EventPublisher eventPublisher) {
providers = new HashMap<>();
providers.put(ReplicationType.STAND_ALONE,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index a6a3249..f451000 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -58,13 +58,13 @@ import java.util.stream.Collectors;
* All the write operations for pipelines must come via PipelineManager.
* It synchronises all write and read operations via a ReadWriteLock.
*/
-public class PipelineManagerV2Impl implements PipelineManager {
+public final class PipelineManagerV2Impl implements PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(SCMPipelineManager.class);
private final ReadWriteLock lock;
private PipelineFactory pipelineFactory;
- private PipelineStateManagerV2 stateManager;
+ private StateManager stateManager;
private Scheduler scheduler;
private BackgroundPipelineCreator backgroundPipelineCreator;
private final ConfigurationSource conf;
@@ -77,9 +77,9 @@ public class PipelineManagerV2Impl implements PipelineManager {
// to prevent pipelines being created until sufficient nodes have registered.
private final AtomicBoolean pipelineCreationAllowed;
- public PipelineManagerV2Impl(ConfigurationSource conf,
+ private PipelineManagerV2Impl(ConfigurationSource conf,
NodeManager nodeManager,
- PipelineStateManagerV2 pipelineStateManager,
+ StateManager pipelineStateManager,
PipelineFactory pipelineFactory) {
this.lock = new ReentrantReadWriteLock();
this.pipelineFactory = pipelineFactory;
@@ -100,17 +100,20 @@ public class PipelineManagerV2Impl implements PipelineManager {
this.pipelineCreationAllowed = new AtomicBoolean(!this.isInSafeMode.get());
}
- public static PipelineManager newPipelineManager(
+ public static PipelineManagerV2Impl newPipelineManager(
ConfigurationSource conf, SCMHAManager scmhaManager,
NodeManager nodeManager, Table<PipelineID, Pipeline> pipelineStore,
- PipelineFactory pipelineFactory) throws IOException {
+ EventPublisher eventPublisher) throws IOException {
// Create PipelineStateManager
- PipelineStateManagerV2 stateManager = PipelineStateManagerV2Impl
+ StateManager stateManager = PipelineStateManagerV2Impl
.newBuilder().setPipelineStore(pipelineStore)
.setRatisServer(scmhaManager.getRatisServer())
.setNodeManager(nodeManager)
.build();
+ // Create PipelineFactory
+ PipelineFactory pipelineFactory = new PipelineFactory(
+ nodeManager, stateManager, conf, eventPublisher);
// Create PipelineManager
PipelineManagerV2Impl pipelineManager = new PipelineManagerV2Impl(conf,
nodeManager, stateManager, pipelineFactory);
@@ -572,6 +575,11 @@ public class PipelineManagerV2Impl implements PipelineManager {
return pipelineCreationAllowed.get();
}
+ @VisibleForTesting
+ public void allowPipelineCreation() {
+ this.pipelineCreationAllowed.set(true);
+ }
+
private void setBackgroundPipelineCreator(
BackgroundPipelineCreator backgroundPipelineCreator) {
this.backgroundPipelineCreator = backgroundPipelineCreator;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 524b5ec..6573271 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -51,7 +51,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
static final Logger LOG =
LoggerFactory.getLogger(PipelinePlacementPolicy.class);
private final NodeManager nodeManager;
- private final PipelineStateManager stateManager;
+ private final StateManager stateManager;
private final ConfigurationSource conf;
private final int heavyNodeCriteria;
private static final int REQUIRED_RACKS = 2;
@@ -65,7 +65,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
* @param conf Configuration
*/
public PipelinePlacementPolicy(final NodeManager nodeManager,
- final PipelineStateManager stateManager, final ConfigurationSource conf) {
+ final StateManager stateManager,
+ final ConfigurationSource conf) {
super(nodeManager, conf);
this.nodeManager = nodeManager;
this.conf = conf;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
index 533f77e..576d415 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
@@ -37,10 +37,10 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
public abstract class PipelineProvider {
private final NodeManager nodeManager;
- private final PipelineStateManager stateManager;
+ private final StateManager stateManager;
public PipelineProvider(NodeManager nodeManager,
- PipelineStateManager stateManager) {
+ StateManager stateManager) {
this.nodeManager = nodeManager;
this.stateManager = stateManager;
}
@@ -54,7 +54,7 @@ public abstract class PipelineProvider {
return nodeManager;
}
- public PipelineStateManager getPipelineStateManager() {
+ public StateManager getPipelineStateManager() {
return stateManager;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index bb56a03..de6f186 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -38,7 +39,7 @@ import java.util.NavigableSet;
* state. All the read and write operations in PipelineStateMap are protected
* by a read write lock.
*/
-public class PipelineStateManager {
+public class PipelineStateManager implements StateManager {
private static final Logger LOG =
LoggerFactory.getLogger(PipelineStateManager.class);
@@ -48,71 +49,90 @@ public class PipelineStateManager {
public PipelineStateManager() {
this.pipelineStateMap = new PipelineStateMap();
}
-
+ @Override
public void addPipeline(Pipeline pipeline) throws IOException {
pipelineStateMap.addPipeline(pipeline);
LOG.info("Created pipeline {}", pipeline);
}
- void addContainerToPipeline(PipelineID pipelineId, ContainerID containerID)
+ @Override
+ public void addContainerToPipeline(PipelineID pipelineId,
+ ContainerID containerID)
throws IOException {
pipelineStateMap.addContainerToPipeline(pipelineId, containerID);
}
- Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException {
+ @Override
+ public Pipeline getPipeline(PipelineID pipelineID)
+ throws PipelineNotFoundException {
return pipelineStateMap.getPipeline(pipelineID);
}
+ @Override
public List<Pipeline> getPipelines() {
return pipelineStateMap.getPipelines();
}
- List<Pipeline> getPipelines(ReplicationType type) {
+ @Override
+ public List<Pipeline> getPipelines(ReplicationType type) {
return pipelineStateMap.getPipelines(type);
}
- List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor) {
+ @Override
+ public List<Pipeline> getPipelines(ReplicationType type,
+ ReplicationFactor factor) {
return pipelineStateMap.getPipelines(type, factor);
}
- List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor,
+ @Override
+ public List<Pipeline> getPipelines(ReplicationType type,
+ ReplicationFactor factor,
PipelineState state) {
return pipelineStateMap.getPipelines(type, factor, state);
}
- List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor,
+ @Override
+ public List<Pipeline> getPipelines(
+ ReplicationType type, ReplicationFactor factor,
PipelineState state, Collection<DatanodeDetails> excludeDns,
Collection<PipelineID> excludePipelines) {
return pipelineStateMap
.getPipelines(type, factor, state, excludeDns, excludePipelines);
}
- List<Pipeline> getPipelines(ReplicationType type, PipelineState... states) {
+ @Override
+ public List<Pipeline> getPipelines(ReplicationType type,
+ PipelineState... states) {
return pipelineStateMap.getPipelines(type, states);
}
- NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
+ @Override
+ public NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
throws IOException {
return pipelineStateMap.getContainers(pipelineID);
}
- int getNumberOfContainers(PipelineID pipelineID) throws IOException {
+ @Override
+ public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
return pipelineStateMap.getNumberOfContainers(pipelineID);
}
- Pipeline removePipeline(PipelineID pipelineID) throws IOException {
+ @Override
+ public Pipeline removePipeline(PipelineID pipelineID) throws IOException {
Pipeline pipeline = pipelineStateMap.removePipeline(pipelineID);
LOG.info("Pipeline {} removed from db", pipeline);
return pipeline;
}
- void removeContainerFromPipeline(PipelineID pipelineID,
+ @Override
+ public void removeContainerFromPipeline(PipelineID pipelineID,
ContainerID containerID) throws IOException {
pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID);
}
- Pipeline finalizePipeline(PipelineID pipelineId)
- throws PipelineNotFoundException {
+ @Override
+ public Pipeline finalizePipeline(PipelineID pipelineId)
+ throws IOException {
Pipeline pipeline = pipelineStateMap.getPipeline(pipelineId);
if (!pipeline.isClosed()) {
pipeline = pipelineStateMap
@@ -122,7 +142,8 @@ public class PipelineStateManager {
return pipeline;
}
- Pipeline openPipeline(PipelineID pipelineId) throws IOException {
+ @Override
+ public Pipeline openPipeline(PipelineID pipelineId) throws IOException {
Pipeline pipeline = pipelineStateMap.getPipeline(pipelineId);
if (pipeline.isClosed()) {
throw new IOException("Closed pipeline can not be opened");
@@ -141,6 +162,7 @@ public class PipelineStateManager {
* @param pipelineID ID of the pipeline to activate.
* @throws IOException in case of any Exception
*/
+ @Override
public void activatePipeline(PipelineID pipelineID)
throws IOException {
pipelineStateMap
@@ -153,14 +175,40 @@ public class PipelineStateManager {
* @param pipelineID ID of the pipeline to deactivate.
* @throws IOException in case of any Exception
*/
+ @Override
public void deactivatePipeline(PipelineID pipelineID)
throws IOException {
pipelineStateMap
.updatePipelineState(pipelineID, PipelineState.DORMANT);
}
+ @Override
public void updatePipelineState(PipelineID id, PipelineState newState)
throws PipelineNotFoundException {
pipelineStateMap.updatePipelineState(id, newState);
}
+
+ @Override
+ public void addPipeline(HddsProtos.Pipeline pipelineProto)
+ throws IOException {
+ throw new IOException("Not supported.");
+ }
+
+ @Override
+ public void removePipeline(HddsProtos.PipelineID pipelineIDProto)
+ throws IOException {
+ throw new IOException("Not supported.");
+ }
+
+ @Override
+ public void updatePipelineState(HddsProtos.PipelineID pipelineIDProto,
+ HddsProtos.PipelineState newState)
+ throws IOException {
+ throw new IOException("Not supported.");
+ }
+
+ @Override
+ public void close() {
+ // Do nothing
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
index c74dc86..703cdec 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
@@ -42,7 +42,7 @@ import java.util.NavigableSet;
* state. All the read and write operations in PipelineStateMap are protected
* by a read write lock.
*/
-public class PipelineStateManagerV2Impl implements PipelineStateManagerV2 {
+public class PipelineStateManagerV2Impl implements StateManager {
private static final Logger LOG =
LoggerFactory.getLogger(PipelineStateManager.class);
@@ -180,6 +180,48 @@ public class PipelineStateManagerV2Impl implements PipelineStateManagerV2 {
pipelineStore.close();
}
+ // TODO Remove legacy
+ @Override
+ public void addPipeline(Pipeline pipeline) throws IOException {
+ throw new IOException("Not supported.");
+ }
+
+ @Override
+ public Pipeline removePipeline(PipelineID pipelineID) throws IOException {
+ throw new IOException("Not supported.");
+ }
+
+ @Override
+ public void updatePipelineState(PipelineID id,
+ Pipeline.PipelineState newState)
+ throws IOException {
+ throw new IOException("Not supported.");
+ }
+
+ @Override
+ public Pipeline finalizePipeline(PipelineID pipelineId)
+ throws IOException {
+ throw new IOException("Not supported.");
+ }
+
+
+ @Override
+ public Pipeline openPipeline(PipelineID pipelineId) throws IOException {
+ throw new IOException("Not supported.");
+ }
+
+ @Override
+ public void activatePipeline(PipelineID pipelineID) throws IOException {
+ throw new IOException("Not supported.");
+ }
+
+ @Override
+ public void deactivatePipeline(PipelineID pipelineID) throws IOException {
+ throw new IOException("Not supported.");
+ }
+
+ // legacy interfaces end
+
public static Builder newBuilder() {
return new Builder();
}
@@ -208,19 +250,19 @@ public class PipelineStateManagerV2Impl implements PipelineStateManagerV2 {
return this;
}
- public PipelineStateManagerV2 build() throws IOException {
+ public StateManager build() throws IOException {
Preconditions.checkNotNull(pipelineStore);
- final PipelineStateManagerV2 pipelineStateManager =
+ final StateManager pipelineStateManager =
new PipelineStateManagerV2Impl(pipelineStore, nodeManager);
final SCMHAInvocationHandler invocationHandler =
new SCMHAInvocationHandler(SCMRatisProtocol.RequestType.PIPELINE,
pipelineStateManager, scmRatisServer);
- return (PipelineStateManagerV2) Proxy.newProxyInstance(
+ return (StateManager) Proxy.newProxyInstance(
SCMHAInvocationHandler.class.getClassLoader(),
- new Class<?>[]{PipelineStateManagerV2.class}, invocationHandler);
+ new Class<?>[]{StateManager.class}, invocationHandler);
}
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 4d91541..821ed30 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -54,8 +54,8 @@ public class RatisPipelineProvider extends PipelineProvider {
private int maxPipelinePerDatanode;
RatisPipelineProvider(NodeManager nodeManager,
- PipelineStateManager stateManager, ConfigurationSource conf,
- EventPublisher eventPublisher) {
+ StateManager stateManager, ConfigurationSource conf,
+ EventPublisher eventPublisher) {
super(nodeManager, stateManager);
this.conf = conf;
this.eventPublisher = eventPublisher;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
index edc40af..19a8fc5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -127,7 +127,7 @@ public final class RatisPipelineUtils {
* @return list of matched pipeline
*/
static List<Pipeline> checkPipelineContainSameDatanodes(
- PipelineStateManagerV2 stateManager, Pipeline pipeline) {
+ StateManager stateManager, Pipeline pipeline) {
return stateManager.getPipelines(
HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 941ce19..71c3190 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -68,7 +68,7 @@ public class SCMPipelineManager implements PipelineManager {
private final ReadWriteLock lock;
private PipelineFactory pipelineFactory;
- private PipelineStateManager stateManager;
+ private StateManager stateManager;
private final BackgroundPipelineCreator backgroundPipelineCreator;
private Scheduler scheduler;
@@ -133,7 +133,7 @@ public class SCMPipelineManager implements PipelineManager {
this.pipelineCreationAllowed = new AtomicBoolean(!this.isInSafeMode.get());
}
- public PipelineStateManager getStateManager() {
+ public StateManager getStateManager() {
return stateManager;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index c7b6305..69711bb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -34,7 +34,7 @@ import java.util.List;
public class SimplePipelineProvider extends PipelineProvider {
public SimplePipelineProvider(NodeManager nodeManager,
- PipelineStateManager stateManager) {
+ StateManager stateManager) {
super(nodeManager, stateManager);
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java
similarity index 83%
rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2.java
rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java
index 4021575..3a772e5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java
@@ -29,8 +29,9 @@ import java.util.NavigableSet;
/**
* Manages the state of pipelines in SCM.
+ * TODO Rename to PipelineStateManager once the old state manager is removed.
*/
-public interface PipelineStateManagerV2 {
+public interface StateManager {
/**
* Adding pipeline would be replicated to Ratis.
@@ -96,4 +97,24 @@ public interface PipelineStateManagerV2 {
ContainerID containerID) throws IOException;
void close() throws Exception;
+
+ // TODO remove legacy interfaces once we switch to Ratis based.
+
+ void addPipeline(Pipeline pipeline) throws IOException;
+
+ Pipeline removePipeline(PipelineID pipelineID) throws IOException;
+
+ void updatePipelineState(PipelineID id, Pipeline.PipelineState newState)
+ throws IOException;
+
+ Pipeline finalizePipeline(PipelineID pipelineId)
+ throws IOException;
+
+ Pipeline openPipeline(PipelineID pipelineId) throws IOException;
+
+ void activatePipeline(PipelineID pipelineID)
+ throws IOException;
+
+ void deactivatePipeline(PipelineID pipelineID)
+ throws IOException;
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
index f9fb150..e355877 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
@@ -36,29 +36,31 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
private boolean autoOpenPipeline;
private boolean isHealthy;
- public MockRatisPipelineProvider(NodeManager nodeManager,
- PipelineStateManager stateManager, ConfigurationSource conf,
- EventPublisher eventPublisher, boolean autoOpen) {
+ public MockRatisPipelineProvider(
+ NodeManager nodeManager, StateManager stateManager,
+ ConfigurationSource conf, EventPublisher eventPublisher,
+ boolean autoOpen) {
super(nodeManager, stateManager, conf, eventPublisher);
autoOpenPipeline = autoOpen;
}
public MockRatisPipelineProvider(NodeManager nodeManager,
- PipelineStateManager stateManager,
+ StateManager stateManager,
ConfigurationSource conf) {
super(nodeManager, stateManager, conf, new EventQueue());
}
public MockRatisPipelineProvider(NodeManager nodeManager,
- PipelineStateManager stateManager,
- ConfigurationSource conf, boolean isHealthy) {
+ StateManager stateManager,
+ ConfigurationSource conf,
+ boolean isHealthy) {
super(nodeManager, stateManager, conf, new EventQueue());
this.isHealthy = isHealthy;
}
- public MockRatisPipelineProvider(NodeManager nodeManager,
- PipelineStateManager stateManager, ConfigurationSource conf,
- EventPublisher eventPublisher) {
+ public MockRatisPipelineProvider(
+ NodeManager nodeManager, StateManager stateManager,
+ ConfigurationSource conf, EventPublisher eventPublisher) {
super(nodeManager, stateManager, conf, eventPublisher);
autoOpenPipeline = true;
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelieManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelieManagerImpl.java
new file mode 100644
index 0000000..a2a8e25
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelieManagerImpl.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.UUID;
+
+/**
+ * Tests for PipelineManagerImpl.
+ */
+public class TestPipelieManagerImpl {
+ private PipelineManagerV2Impl pipelineManager;
+ private File testDir;
+ private DBStore dbStore;
+
+ @Before
+ public void init() throws Exception {
+ final OzoneConfiguration conf = SCMTestUtils.getConf();
+ testDir = GenericTestUtils.getTestDir(
+ TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ dbStore = DBStoreBuilder.createDBStore(conf, new SCMDBDefinition());
+ pipelineManager = PipelineManagerV2Impl.newPipelineManager(
+ conf, MockSCMHAManager.getInstance(),
+ new MockNodeManager(true, 20),
+ SCMDBDefinition.PIPELINES.getTable(dbStore), new EventQueue());
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ if (pipelineManager != null) {
+ pipelineManager.close();
+ }
+ if (dbStore != null) {
+ dbStore.close();
+ }
+ FileUtil.fullyDelete(testDir);
+ }
+
+ @Test
+ public void testCreatePipeline() throws Exception {
+ Assert.assertTrue(pipelineManager.getPipelines().isEmpty());
+ pipelineManager.allowPipelineCreation();
+ Pipeline pipeline1 = pipelineManager.createPipeline(
+ HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
+ Assert.assertEquals(1, pipelineManager.getPipelines().size());
+ Assert.assertTrue(pipelineManager.containsPipeline(pipeline1.getId()));
+
+ Pipeline pipeline2 = pipelineManager.createPipeline(
+ HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE);
+ Assert.assertEquals(2, pipelineManager.getPipelines().size());
+ Assert.assertTrue(pipelineManager.containsPipeline(pipeline2.getId()));
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java
index 41eea3d..3320081 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java
@@ -80,7 +80,7 @@ public class TestPipelineDatanodesIntersection {
NodeManager nodeManager= new MockNodeManager(true, nodeCount);
conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, nodeHeaviness);
conf.setBoolean(OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false);
- PipelineStateManager stateManager = new PipelineStateManager();
+ StateManager stateManager = new PipelineStateManager();
PipelineProvider provider = new MockRatisPipelineProvider(nodeManager,
stateManager, conf);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
index 6bff581..8252e2c 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
@@ -37,7 +37,7 @@ import java.util.Set;
*/
public class TestPipelineStateManager {
- private PipelineStateManager stateManager;
+ private StateManager stateManager;
@Before
public void init() throws Exception {
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
index a8dd3c9..beed591 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -118,7 +117,7 @@ public class ReconPipelineManager extends SCMPipelineManager {
if (!p.getPipelineState().equals(CLOSED)) {
try {
getStateManager().updatePipelineState(pipelineID, CLOSED);
- } catch (PipelineNotFoundException e) {
+ } catch (IOException e) {
LOG.warn("Pipeline {} not found while updating state. ",
p.getId(), e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org