You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2021/10/01 11:59:05 UTC
[ozone] branch master updated: HDDS-5762. remove
piplelineStateManager V1 code (#2661)
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 05eb172 HDDS-5762. remove piplelineStateManager V1 code (#2661)
05eb172 is described below
commit 05eb1726bb3b2f9f24ea1ac4640fa0874859f93a
Author: Jackson Yao <ja...@tencent.com>
AuthorDate: Fri Oct 1 19:58:50 2021 +0800
HDDS-5762. remove piplelineStateManager V1 code (#2661)
---
.../hadoop/hdds/scm/pipeline/PipelineFactory.java | 2 +-
.../hdds/scm/pipeline/PipelineManagerImpl.java | 12 +-
.../hdds/scm/pipeline/PipelinePlacementPolicy.java | 6 +-
.../hadoop/hdds/scm/pipeline/PipelineProvider.java | 6 +-
.../hdds/scm/pipeline/PipelineStateManager.java | 266 +++++++--------------
...erV2Impl.java => PipelineStateManagerImpl.java} | 58 +----
.../hdds/scm/pipeline/RatisPipelineProvider.java | 2 +-
.../hdds/scm/pipeline/RatisPipelineUtils.java | 4 +-
.../hdds/scm/pipeline/SimplePipelineProvider.java | 2 +-
.../hadoop/hdds/scm/pipeline/StateManager.java | 127 ----------
.../algorithms/DefaultLeaderChoosePolicy.java | 4 +-
.../choose/algorithms/LeaderChoosePolicy.java | 8 +-
.../algorithms/LeaderChoosePolicyFactory.java | 8 +-
.../algorithms/MinLeaderCountChoosePolicy.java | 6 +-
.../scm/container/TestContainerManagerImpl.java | 6 +-
.../hdds/scm/node/TestContainerPlacement.java | 29 ++-
.../hdds/scm/pipeline/MockPipelineManager.java | 27 ++-
.../scm/pipeline/MockRatisPipelineProvider.java | 6 +-
.../TestPipelineDatanodesIntersection.java | 52 +++-
.../scm/pipeline/TestPipelinePlacementPolicy.java | 73 +++++-
...ager.java => TestPipelineStateManagerImpl.java} | 237 ++++++++++++------
.../scm/pipeline/TestRatisPipelineProvider.java | 66 ++++-
...TestSCMStoreImplWithOldPipelineIDKeyFormat.java | 209 ----------------
.../scm/pipeline/TestSimplePipelineProvider.java | 48 +++-
.../choose/algorithms/TestLeaderChoosePolicy.java | 6 +-
.../ozone/recon/scm/ReconPipelineManager.java | 10 +-
26 files changed, 547 insertions(+), 733 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index 0fbabee..b58716b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -40,7 +40,7 @@ public class PipelineFactory {
private Map<ReplicationType, PipelineProvider> providers;
- PipelineFactory(NodeManager nodeManager, StateManager stateManager,
+ PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager,
ConfigurationSource conf, EventPublisher eventPublisher,
SCMContext scmContext) {
providers = new HashMap<>();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index c6651f4..8d37144 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -70,7 +70,7 @@ public class PipelineManagerImpl implements PipelineManager {
// Limit the number of on-going ratis operation to be 1.
private final ReentrantReadWriteLock lock;
private PipelineFactory pipelineFactory;
- private StateManager stateManager;
+ private PipelineStateManager stateManager;
private BackgroundPipelineCreatorV2 backgroundPipelineCreator;
private final ConfigurationSource conf;
private final EventPublisher eventPublisher;
@@ -88,7 +88,7 @@ public class PipelineManagerImpl implements PipelineManager {
protected PipelineManagerImpl(ConfigurationSource conf,
SCMHAManager scmhaManager,
NodeManager nodeManager,
- StateManager pipelineStateManager,
+ PipelineStateManager pipelineStateManager,
PipelineFactory pipelineFactory,
EventPublisher eventPublisher,
SCMContext scmContext) {
@@ -118,8 +118,8 @@ public class PipelineManagerImpl implements PipelineManager {
EventPublisher eventPublisher,
SCMContext scmContext,
SCMServiceManager serviceManager) throws IOException {
- // Create PipelineStateManager
- StateManager stateManager = PipelineStateManagerV2Impl
+ // Create PipelineStateManagerImpl
+ PipelineStateManager stateManager = PipelineStateManagerImpl
.newBuilder().setPipelineStore(pipelineStore)
.setRatisServer(scmhaManager.getRatisServer())
.setNodeManager(nodeManager)
@@ -553,7 +553,7 @@ public class PipelineManagerImpl implements PipelineManager {
try {
stateManager.close();
} catch (Exception ex) {
- LOG.error("PipelineStateManager close failed", ex);
+ LOG.error("PipelineStateManagerImpl close failed", ex);
}
}
@@ -569,7 +569,7 @@ public class PipelineManagerImpl implements PipelineManager {
}
@VisibleForTesting
- public StateManager getStateManager() {
+ public PipelineStateManager getStateManager() {
return stateManager;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 361fd83..5782fbe 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -55,7 +55,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
static final Logger LOG =
LoggerFactory.getLogger(PipelinePlacementPolicy.class);
private final NodeManager nodeManager;
- private final StateManager stateManager;
+ private final PipelineStateManager stateManager;
private final ConfigurationSource conf;
private final int heavyNodeCriteria;
private static final int REQUIRED_RACKS = 2;
@@ -70,11 +70,11 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
* load balancing and rack awareness.
*
* @param nodeManager NodeManager
- * @param stateManager PipelineStateManager
+ * @param stateManager PipelineStateManagerImpl
* @param conf Configuration
*/
public PipelinePlacementPolicy(final NodeManager nodeManager,
- final StateManager stateManager,
+ final PipelineStateManager stateManager,
final ConfigurationSource conf) {
super(nodeManager, conf);
this.nodeManager = nodeManager;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
index 92b30b6..3c00906 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
@@ -41,10 +41,10 @@ public abstract class PipelineProvider<REPLICATION_CONFIG
private static final Logger LOG =
LoggerFactory.getLogger(PipelineProvider.class);
private final NodeManager nodeManager;
- private final StateManager stateManager;
+ private final PipelineStateManager stateManager;
public PipelineProvider(NodeManager nodeManager,
- StateManager stateManager) {
+ PipelineStateManager stateManager) {
this.nodeManager = nodeManager;
this.stateManager = stateManager;
}
@@ -58,7 +58,7 @@ public abstract class PipelineProvider<REPLICATION_CONFIG
return nodeManager;
}
- public StateManager getPipelineStateManager() {
+ public PipelineStateManager getPipelineStateManager() {
return stateManager;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index c3df3d2..ed4ddde 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -1,19 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
@@ -22,10 +21,8 @@ import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
import org.apache.hadoop.hdds.utils.db.Table;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
@@ -33,178 +30,77 @@ import java.util.List;
import java.util.NavigableSet;
/**
- * Manages the state of pipelines in SCM. All write operations like pipeline
- * creation, removal and updates should come via SCMPipelineManager.
- * PipelineStateMap class holds the data structures related to pipeline and its
- * state. All the read and write operations in PipelineStateMap are protected
- * by a read write lock.
+ * Manages the state of pipelines in SCM.
*/
-public class PipelineStateManager implements StateManager {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(PipelineStateManager.class);
-
- private final PipelineStateMap pipelineStateMap;
-
- public PipelineStateManager() {
- this.pipelineStateMap = new PipelineStateMap();
- }
- @Override
- public void addPipeline(Pipeline pipeline) throws IOException {
- pipelineStateMap.addPipeline(pipeline);
- LOG.info("Created pipeline {}", pipeline);
- }
-
- @Override
- public void addContainerToPipeline(PipelineID pipelineId,
- ContainerID containerID)
- throws IOException {
- pipelineStateMap.addContainerToPipeline(pipelineId, containerID);
- }
-
- @Override
- public Pipeline getPipeline(PipelineID pipelineID)
- throws PipelineNotFoundException {
- return pipelineStateMap.getPipeline(pipelineID);
- }
-
- @Override
- public List<Pipeline> getPipelines() {
- return pipelineStateMap.getPipelines();
- }
-
- @Override
- public List<Pipeline> getPipelines(
- ReplicationConfig replicationConfig
- ) {
- return pipelineStateMap.getPipelines(replicationConfig);
- }
-
- @Override
- public List<Pipeline> getPipelines(
- ReplicationConfig replicationConfig,
- PipelineState state
- ) {
- return pipelineStateMap.getPipelines(replicationConfig, state);
- }
+public interface PipelineStateManager {
- @Override
- public List<Pipeline> getPipelines(
- ReplicationConfig replicationConfig,
- PipelineState state, Collection<DatanodeDetails> excludeDns,
- Collection<PipelineID> excludePipelines) {
- return pipelineStateMap
- .getPipelines(replicationConfig, state, excludeDns, excludePipelines);
- }
-
- @Override
- public NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
- throws IOException {
- return pipelineStateMap.getContainers(pipelineID);
- }
-
- @Override
- public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
- return pipelineStateMap.getNumberOfContainers(pipelineID);
- }
-
- @Override
- public Pipeline removePipeline(PipelineID pipelineID) throws IOException {
- Pipeline pipeline = pipelineStateMap.removePipeline(pipelineID);
- LOG.info("Pipeline {} removed from db", pipeline);
- return pipeline;
- }
-
- @Override
- public void removeContainerFromPipeline(PipelineID pipelineID,
- ContainerID containerID) throws IOException {
- pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID);
- }
-
- @Override
- public Pipeline finalizePipeline(PipelineID pipelineId)
- throws IOException {
- Pipeline pipeline = pipelineStateMap.getPipeline(pipelineId);
- if (!pipeline.isClosed()) {
- pipeline = pipelineStateMap
- .updatePipelineState(pipelineId, PipelineState.CLOSED);
- LOG.info("Pipeline {} moved to CLOSED state", pipeline);
- }
- return pipeline;
- }
-
- @Override
- public Pipeline openPipeline(PipelineID pipelineId) throws IOException {
- Pipeline pipeline = pipelineStateMap.getPipeline(pipelineId);
- if (pipeline.isClosed()) {
- throw new IOException("Closed pipeline can not be opened");
- }
- if (pipeline.getPipelineState() == PipelineState.ALLOCATED) {
- LOG.info("Pipeline {} moved to OPEN state", pipeline);
- pipeline = pipelineStateMap
- .updatePipelineState(pipelineId, PipelineState.OPEN);
- }
- return pipeline;
- }
+ /**
+ * Adding pipeline would be replicated to Ratis.
+ * @param pipelineProto
+ * @throws IOException
+ */
+ @Replicate
+ void addPipeline(HddsProtos.Pipeline pipelineProto) throws IOException;
/**
- * Activates a dormant pipeline.
- *
- * @param pipelineID ID of the pipeline to activate.
- * @throws IOException in case of any Exception
+ * Removing pipeline would be replicated to Ratis.
+ * @param pipelineIDProto
+ * @return Pipeline removed
+ * @throws IOException
*/
- @Override
- public void activatePipeline(PipelineID pipelineID)
- throws IOException {
- pipelineStateMap
- .updatePipelineState(pipelineID, PipelineState.OPEN);
- }
+ @Replicate
+ void removePipeline(HddsProtos.PipelineID pipelineIDProto)
+ throws IOException;
/**
- * Deactivates an active pipeline.
- *
- * @param pipelineID ID of the pipeline to deactivate.
- * @throws IOException in case of any Exception
+ * Updating pipeline state would be replicated to Ratis.
+ * @param pipelineIDProto
+ * @param newState
+ * @throws IOException
*/
- @Override
- public void deactivatePipeline(PipelineID pipelineID)
- throws IOException {
- pipelineStateMap
- .updatePipelineState(pipelineID, PipelineState.DORMANT);
- }
-
- @Override
- public void reinitialize(Table<PipelineID, Pipeline> pipelineStore)
- throws IOException {
- }
-
- @Override
- public void updatePipelineState(PipelineID id, PipelineState newState)
- throws PipelineNotFoundException {
- pipelineStateMap.updatePipelineState(id, newState);
- }
-
- @Override
- public void addPipeline(HddsProtos.Pipeline pipelineProto)
- throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public void removePipeline(HddsProtos.PipelineID pipelineIDProto)
- throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public void updatePipelineState(
- HddsProtos.PipelineID pipelineIDProto, HddsProtos.PipelineState newState)
- throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public void close() {
- // Do nothing
- }
+ @Replicate
+ void updatePipelineState(
+ HddsProtos.PipelineID pipelineIDProto,
+ HddsProtos.PipelineState newState
+ )
+ throws IOException;
+
+ void addContainerToPipeline(
+ PipelineID pipelineID,
+ ContainerID containerID
+ ) throws IOException;
+
+ Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException;
+
+ List<Pipeline> getPipelines();
+
+ List<Pipeline> getPipelines(
+ ReplicationConfig replicationConfig
+ );
+
+ List<Pipeline> getPipelines(
+ ReplicationConfig replicationConfig,
+ Pipeline.PipelineState state
+ );
+
+ List<Pipeline> getPipelines(
+ ReplicationConfig replicationConfig,
+ Pipeline.PipelineState state,
+ Collection<DatanodeDetails> excludeDns,
+ Collection<PipelineID> excludePipelines
+ );
+
+ NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
+ throws IOException;
+
+ int getNumberOfContainers(PipelineID pipelineID) throws IOException;
+
+ void removeContainerFromPipeline(PipelineID pipelineID,
+ ContainerID containerID) throws IOException;
+
+ void close() throws Exception;
+
+ void reinitialize(Table<PipelineID, Pipeline> pipelineStore)
+ throws IOException;
+
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerImpl.java
similarity index 88%
rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerImpl.java
index 3f748e9..c9b1d77 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerImpl.java
@@ -46,10 +46,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* state. All the read and write operations in PipelineStateMap are protected
* by a read write lock.
*/
-public class PipelineStateManagerV2Impl implements StateManager {
+public class PipelineStateManagerImpl implements PipelineStateManager {
private static final Logger LOG =
- LoggerFactory.getLogger(PipelineStateManager.class);
+ LoggerFactory.getLogger(PipelineStateManagerImpl.class);
private PipelineStateMap pipelineStateMap;
private final NodeManager nodeManager;
@@ -60,7 +60,7 @@ public class PipelineStateManagerV2Impl implements StateManager {
// See https://issues.apache.org/jira/browse/HDDS-4560
private final ReadWriteLock lock = new ReentrantReadWriteLock();
- public PipelineStateManagerV2Impl(
+ public PipelineStateManagerImpl(
Table<PipelineID, Pipeline> pipelineStore, NodeManager nodeManager,
DBTransactionBuffer buffer) throws IOException {
this.pipelineStateMap = new PipelineStateMap();
@@ -286,46 +286,6 @@ public class PipelineStateManagerV2Impl implements StateManager {
}
}
- // TODO Remove legacy
- @Override
- public void addPipeline(Pipeline pipeline) throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public Pipeline removePipeline(PipelineID pipelineID) throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public void updatePipelineState(PipelineID id,
- Pipeline.PipelineState newState)
- throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public Pipeline finalizePipeline(PipelineID pipelineId)
- throws IOException {
- throw new IOException("Not supported.");
- }
-
-
- @Override
- public Pipeline openPipeline(PipelineID pipelineId) throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public void activatePipeline(PipelineID pipelineID) throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public void deactivatePipeline(PipelineID pipelineID) throws IOException {
- throw new IOException("Not supported.");
- }
-
@Override
public void reinitialize(Table<PipelineID, Pipeline> store)
throws IOException {
@@ -349,7 +309,7 @@ public class PipelineStateManagerV2Impl implements StateManager {
}
/**
- * Builder for PipelineStateManager.
+ * Builder for PipelineStateManagerImpl.
*/
public static class Builder {
private Table<PipelineID, Pipeline> pipelineStore;
@@ -378,20 +338,20 @@ public class PipelineStateManagerV2Impl implements StateManager {
return this;
}
- public StateManager build() throws IOException {
+ public PipelineStateManager build() throws IOException {
Preconditions.checkNotNull(pipelineStore);
- final StateManager pipelineStateManager =
- new PipelineStateManagerV2Impl(
+ final PipelineStateManager pipelineStateManager =
+ new PipelineStateManagerImpl(
pipelineStore, nodeManager, transactionBuffer);
final SCMHAInvocationHandler invocationHandler =
new SCMHAInvocationHandler(SCMRatisProtocol.RequestType.PIPELINE,
pipelineStateManager, scmRatisServer);
- return (StateManager) Proxy.newProxyInstance(
+ return (PipelineStateManager) Proxy.newProxyInstance(
SCMHAInvocationHandler.class.getClassLoader(),
- new Class<?>[]{StateManager.class}, invocationHandler);
+ new Class<?>[]{PipelineStateManager.class}, invocationHandler);
}
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 9cc3954..d7a8067 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -65,7 +65,7 @@ public class RatisPipelineProvider
@VisibleForTesting
public RatisPipelineProvider(NodeManager nodeManager,
- StateManager stateManager,
+ PipelineStateManager stateManager,
ConfigurationSource conf,
EventPublisher eventPublisher,
SCMContext scmContext) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
index ed35bcb..847b50e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -103,12 +103,12 @@ public final class RatisPipelineUtils {
* Return the list of pipelines who share the same set of datanodes
* with the input pipeline.
*
- * @param stateManager PipelineStateManager
+ * @param stateManager PipelineStateManagerImpl
* @param pipeline input pipeline
* @return list of matched pipeline
*/
static List<Pipeline> checkPipelineContainSameDatanodes(
- StateManager stateManager, Pipeline pipeline) {
+ PipelineStateManager stateManager, Pipeline pipeline) {
return stateManager
.getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE))
.stream().filter(p -> !p.getId().equals(pipeline.getId()) &&
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index 147f773..43af98b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -34,7 +34,7 @@ public class SimplePipelineProvider
extends PipelineProvider<StandaloneReplicationConfig> {
public SimplePipelineProvider(NodeManager nodeManager,
- StateManager stateManager) {
+ PipelineStateManager stateManager) {
super(nodeManager, stateManager);
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java
deleted file mode 100644
index c075dea..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * <p>http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * <p>Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.pipeline;
-
-import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.metadata.Replicate;
-import org.apache.hadoop.hdds.utils.db.Table;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.NavigableSet;
-
-/**
- * Manages the state of pipelines in SCM.
- * TODO Rename to PipelineStateManager once the old state manager is removed.
- */
-public interface StateManager {
-
- /**
- * Adding pipeline would be replicated to Ratis.
- * @param pipelineProto
- * @throws IOException
- */
- @Replicate
- void addPipeline(HddsProtos.Pipeline pipelineProto) throws IOException;
-
- /**
- * Removing pipeline would be replicated to Ratis.
- * @param pipelineIDProto
- * @return Pipeline removed
- * @throws IOException
- */
- @Replicate
- void removePipeline(HddsProtos.PipelineID pipelineIDProto)
- throws IOException;
-
- /**
- * Updating pipeline state would be replicated to Ratis.
- * @param pipelineIDProto
- * @param newState
- * @throws IOException
- */
- @Replicate
- void updatePipelineState(
- HddsProtos.PipelineID pipelineIDProto,
- HddsProtos.PipelineState newState
- )
- throws IOException;
-
- void addContainerToPipeline(
- PipelineID pipelineID,
- ContainerID containerID
- ) throws IOException;
-
- Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException;
-
- List<Pipeline> getPipelines();
-
- List<Pipeline> getPipelines(
- ReplicationConfig replicationConfig
- );
-
- List<Pipeline> getPipelines(
- ReplicationConfig replicationConfig,
- Pipeline.PipelineState state
- );
-
- List<Pipeline> getPipelines(
- ReplicationConfig replicationConfig,
- Pipeline.PipelineState state,
- Collection<DatanodeDetails> excludeDns,
- Collection<PipelineID> excludePipelines
- );
-
- NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
- throws IOException;
-
- int getNumberOfContainers(PipelineID pipelineID) throws IOException;
-
- void removeContainerFromPipeline(PipelineID pipelineID,
- ContainerID containerID) throws IOException;
-
- void close() throws Exception;
-
- // TODO remove legacy interfaces once we switch to Ratis based.
-
- void addPipeline(Pipeline pipeline) throws IOException;
-
- Pipeline removePipeline(PipelineID pipelineID) throws IOException;
-
- void updatePipelineState(PipelineID id, Pipeline.PipelineState newState)
- throws IOException;
-
- Pipeline finalizePipeline(PipelineID pipelineId)
- throws IOException;
-
- Pipeline openPipeline(PipelineID pipelineId) throws IOException;
-
- void activatePipeline(PipelineID pipelineID)
- throws IOException;
-
- void deactivatePipeline(PipelineID pipelineID)
- throws IOException;
-
- void reinitialize(Table<PipelineID, Pipeline> pipelineStore)
- throws IOException;
-
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/DefaultLeaderChoosePolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/DefaultLeaderChoosePolicy.java
index 0b49ed8..415cf10 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/DefaultLeaderChoosePolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/DefaultLeaderChoosePolicy.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipeline.StateManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
import java.util.List;
@@ -31,7 +31,7 @@ import java.util.List;
public class DefaultLeaderChoosePolicy extends LeaderChoosePolicy {
public DefaultLeaderChoosePolicy(
- NodeManager nodeManager, StateManager pipelineStateManager) {
+ NodeManager nodeManager, PipelineStateManager pipelineStateManager) {
super(nodeManager, pipelineStateManager);
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicy.java
index ada7702..04c155b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicy.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipeline.StateManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
import java.util.List;
@@ -29,10 +29,10 @@ import java.util.List;
public abstract class LeaderChoosePolicy {
private final NodeManager nodeManager;
- private final StateManager pipelineStateManager;
+ private final PipelineStateManager pipelineStateManager;
public LeaderChoosePolicy(
- NodeManager nodeManager, StateManager pipelineStateManager) {
+ NodeManager nodeManager, PipelineStateManager pipelineStateManager) {
this.nodeManager = nodeManager;
this.pipelineStateManager = pipelineStateManager;
}
@@ -49,7 +49,7 @@ public abstract class LeaderChoosePolicy {
return nodeManager;
}
- protected StateManager getPipelineStateManager() {
+ protected PipelineStateManager getPipelineStateManager() {
return pipelineStateManager;
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicyFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicyFactory.java
index 03d676e..af54d8f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicyFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicyFactory.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipeline.StateManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +45,7 @@ public final class LeaderChoosePolicyFactory {
public static LeaderChoosePolicy getPolicy(
ConfigurationSource conf, final NodeManager nodeManager,
- final StateManager pipelineStateManager) throws SCMException {
+ final PipelineStateManager pipelineStateManager) throws SCMException {
final Class<? extends LeaderChoosePolicy> policyClass = conf
.getClass(ScmConfigKeys.OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY,
OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY_DEFAULT,
@@ -53,12 +53,12 @@ public final class LeaderChoosePolicyFactory {
Constructor<? extends LeaderChoosePolicy> constructor;
try {
constructor = policyClass.getDeclaredConstructor(NodeManager.class,
- StateManager.class);
+ PipelineStateManager.class);
LOG.info("Create leader choose policy of type {}",
policyClass.getCanonicalName());
} catch (NoSuchMethodException e) {
String msg = "Failed to find constructor(NodeManager, " +
- "PipelineStateManager) for class " +
+ "PipelineStateManagerImpl) for class " +
policyClass.getCanonicalName();
LOG.error(msg);
throw new SCMException(msg,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/MinLeaderCountChoosePolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/MinLeaderCountChoosePolicy.java
index 8cb1df1..d4068b9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/MinLeaderCountChoosePolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/MinLeaderCountChoosePolicy.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.scm.pipeline.StateManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +41,7 @@ public class MinLeaderCountChoosePolicy extends LeaderChoosePolicy {
LoggerFactory.getLogger(MinLeaderCountChoosePolicy.class);
public MinLeaderCountChoosePolicy(
- NodeManager nodeManager, StateManager pipelineStateManager) {
+ NodeManager nodeManager, PipelineStateManager pipelineStateManager) {
super(nodeManager, pipelineStateManager);
}
@@ -66,7 +66,7 @@ public class MinLeaderCountChoosePolicy extends LeaderChoosePolicy {
private Map<DatanodeDetails, Integer> getSuggestedLeaderCount(
List<DatanodeDetails> dns, NodeManager nodeManager,
- StateManager pipelineStateManager) {
+ PipelineStateManager pipelineStateManager) {
Map<DatanodeDetails, Integer> suggestedLeaderCount = new HashMap<>();
for (DatanodeDetails dn : dns) {
suggestedLeaderCount.put(dn, 0);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
index bf7af97..c6bb279 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.MockPipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.utils.db.DBStore;
@@ -53,6 +54,7 @@ public class TestContainerManagerImpl {
private ContainerManager containerManager;
private SCMHAManager scmhaManager;
private SequenceIdGenerator sequenceIdGen;
+ private NodeManager nodeManager;
@Before
public void setUp() throws Exception {
@@ -63,9 +65,11 @@ public class TestContainerManagerImpl {
dbStore = DBStoreBuilder.createDBStore(
conf, new SCMDBDefinition());
scmhaManager = MockSCMHAManager.getInstance(true);
+ nodeManager = new MockNodeManager(true, 10);
sequenceIdGen = new SequenceIdGenerator(
conf, scmhaManager, SCMDBDefinition.SEQUENCE_ID.getTable(dbStore));
- final PipelineManager pipelineManager = MockPipelineManager.getInstance();
+ final PipelineManager pipelineManager =
+ new MockPipelineManager(dbStore, scmhaManager, nodeManager);
pipelineManager.createPipeline(new RatisReplicationConfig(
ReplicationFactor.THREE));
containerManager = new ContainerManagerImpl(conf,
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 2443345..f17ea5e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
@@ -86,6 +87,7 @@ public class TestContainerPlacement {
private SequenceIdGenerator sequenceIdGen;
private OzoneConfiguration conf;
private PipelineManager pipelineManager;
+ private NodeManager nodeManager;
@Rule
public ExpectedException thrown = ExpectedException.none();
@@ -101,7 +103,9 @@ public class TestContainerPlacement {
scmhaManager = MockSCMHAManager.getInstance(true);
sequenceIdGen = new SequenceIdGenerator(
conf, scmhaManager, SCMDBDefinition.SEQUENCE_ID.getTable(dbStore));
- pipelineManager = MockPipelineManager.getInstance();
+ nodeManager = new MockNodeManager(true, 10);
+ pipelineManager = new MockPipelineManager(dbStore,
+ scmhaManager, nodeManager);
pipelineManager.createPipeline(new RatisReplicationConfig(
HddsProtos.ReplicationFactor.THREE));
}
@@ -149,9 +153,9 @@ public class TestContainerPlacement {
.thenReturn(maxLayoutVersion());
Mockito.when(versionManager.getSoftwareLayoutVersion())
.thenReturn(maxLayoutVersion());
- SCMNodeManager nodeManager = new SCMNodeManager(config, storageConfig,
+ SCMNodeManager scmNodeManager = new SCMNodeManager(config, storageConfig,
eventQueue, null, SCMContext.emptyContext(), versionManager);
- return nodeManager;
+ return scmNodeManager;
}
ContainerManager createContainerManager()
@@ -183,29 +187,30 @@ public class TestContainerPlacement {
conf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
SCMContainerPlacementCapacity.class, PlacementPolicy.class);
- SCMNodeManager nodeManager = createNodeManager(conf);
+ SCMNodeManager scmNodeManager = createNodeManager(conf);
containerManager = createContainerManager();
List<DatanodeDetails> datanodes =
- TestUtils.getListOfRegisteredDatanodeDetails(nodeManager, nodeCount);
+ TestUtils.getListOfRegisteredDatanodeDetails(scmNodeManager, nodeCount);
XceiverClientManager xceiverClientManager = null;
- LayoutVersionManager versionManager = nodeManager.getLayoutVersionManager();
+ LayoutVersionManager versionManager =
+ scmNodeManager.getLayoutVersionManager();
LayoutVersionProto layoutInfo =
toLayoutVersionProto(versionManager.getMetadataLayoutVersion(),
versionManager.getSoftwareLayoutVersion());
try {
for (DatanodeDetails datanodeDetails : datanodes) {
- nodeManager.processHeartbeat(datanodeDetails, layoutInfo);
+ scmNodeManager.processHeartbeat(datanodeDetails, layoutInfo);
}
//TODO: wait for heartbeat to be processed
Thread.sleep(4 * 1000);
- assertEquals(nodeCount, nodeManager.getNodeCount(null, HEALTHY));
+ assertEquals(nodeCount, scmNodeManager.getNodeCount(null, HEALTHY));
assertEquals(capacity * nodeCount,
- (long) nodeManager.getStats().getCapacity().get());
+ (long) scmNodeManager.getStats().getCapacity().get());
assertEquals(used * nodeCount,
- (long) nodeManager.getStats().getScmUsed().get());
+ (long) scmNodeManager.getStats().getScmUsed().get());
assertEquals(remaining * nodeCount,
- (long) nodeManager.getStats().getRemaining().get());
+ (long) scmNodeManager.getStats().getRemaining().get());
xceiverClientManager= new XceiverClientManager(conf);
@@ -220,7 +225,7 @@ public class TestContainerPlacement {
container.containerID()).size());
} finally {
IOUtils.closeQuietly(containerManager);
- IOUtils.closeQuietly(nodeManager);
+ IOUtils.closeQuietly(scmNodeManager);
if (xceiverClientManager != null) {
xceiverClientManager.close();
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index fce38a1..b155f28 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -21,7 +21,12 @@ import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.ClientVersions;
import java.io.IOException;
import java.util.Collection;
@@ -38,12 +43,14 @@ public final class MockPipelineManager implements PipelineManager {
private PipelineStateManager stateManager;
- public static PipelineManager getInstance() {
- return new MockPipelineManager();
- }
-
- private MockPipelineManager() {
- this.stateManager = new PipelineStateManager();
+ public MockPipelineManager(DBStore dbStore, SCMHAManager scmhaManager,
+ NodeManager nodeManager) throws IOException {
+ stateManager = PipelineStateManagerImpl
+ .newBuilder().setNodeManager(nodeManager)
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+ .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+ .build();
}
@Override
@@ -59,7 +66,9 @@ public final class MockPipelineManager implements PipelineManager {
.setNodes(nodes)
.setState(Pipeline.PipelineState.OPEN)
.build();
- stateManager.addPipeline(pipeline);
+
+ stateManager.addPipeline(pipeline.getProtobufMessage(
+ ClientVersions.CURRENT_VERSION));
return pipeline;
}
@@ -145,13 +154,11 @@ public final class MockPipelineManager implements PipelineManager {
@Override
public void openPipeline(final PipelineID pipelineId)
throws IOException {
- stateManager.openPipeline(pipelineId);
}
@Override
public void closePipeline(final Pipeline pipeline, final boolean onTimeout)
throws IOException {
- stateManager.finalizePipeline(pipeline.getId());
}
@Override
@@ -188,13 +195,11 @@ public final class MockPipelineManager implements PipelineManager {
@Override
public void activatePipeline(final PipelineID pipelineID)
throws IOException {
-
}
@Override
public void deactivatePipeline(final PipelineID pipelineID)
throws IOException {
- stateManager.deactivatePipeline(pipelineID);
}
@Override
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
index 663c2c7..4760c0e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
@@ -38,7 +38,7 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
private boolean autoOpenPipeline;
public MockRatisPipelineProvider(
- NodeManager nodeManager, StateManager stateManager,
+ NodeManager nodeManager, PipelineStateManager stateManager,
ConfigurationSource conf, EventPublisher eventPublisher,
boolean autoOpen) {
super(nodeManager, stateManager,
@@ -47,14 +47,14 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
}
public MockRatisPipelineProvider(NodeManager nodeManager,
- StateManager stateManager,
+ PipelineStateManager stateManager,
ConfigurationSource conf) {
super(nodeManager, stateManager,
conf, new EventQueue(), SCMContext.emptyContext());
}
public MockRatisPipelineProvider(
- NodeManager nodeManager, StateManager stateManager,
+ NodeManager nodeManager, PipelineStateManager stateManager,
ConfigurationSource conf, EventPublisher eventPublisher) {
super(nodeManager, stateManager,
conf, eventPublisher, SCMContext.emptyContext());
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java
index 54692cd..fec2f2e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java
@@ -18,13 +18,26 @@
package org.apache.hadoop.hdds.scm.pipeline;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.ozone.ClientVersions;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -33,10 +46,12 @@ import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.UUID;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE;
@@ -53,11 +68,27 @@ public class TestPipelineDatanodesIntersection {
private int nodeHeaviness;
private OzoneConfiguration conf;
private boolean end;
+ private File testDir;
+ private DBStore dbStore;
@Before
- public void initialize() {
- conf = new OzoneConfiguration();
+ public void initialize() throws IOException {
+ conf = SCMTestUtils.getConf();
end = false;
+ testDir = GenericTestUtils.getTestDir(
+ TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ dbStore = DBStoreBuilder.createDBStore(
+ conf, new SCMDBDefinition());
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ if (dbStore != null) {
+ dbStore.close();
+ }
+
+ FileUtil.fullyDelete(testDir);
}
public TestPipelineDatanodesIntersection(int nodeCount, int nodeHeaviness) {
@@ -78,11 +109,20 @@ public class TestPipelineDatanodesIntersection {
}
@Test
- public void testPipelineDatanodesIntersection() {
+ public void testPipelineDatanodesIntersection() throws IOException {
NodeManager nodeManager= new MockNodeManager(true, nodeCount);
conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, nodeHeaviness);
conf.setBoolean(OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false);
- StateManager stateManager = new PipelineStateManager();
+ SCMHAManager scmhaManager = MockSCMHAManager.getInstance(true);
+
+ PipelineStateManager stateManager = PipelineStateManagerImpl.newBuilder()
+ .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setNodeManager(nodeManager)
+ .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+ .build();
+
+
PipelineProvider provider = new MockRatisPipelineProvider(nodeManager,
stateManager, conf);
@@ -94,7 +134,9 @@ public class TestPipelineDatanodesIntersection {
try {
Pipeline pipeline = provider.create(new RatisReplicationConfig(
ReplicationFactor.THREE));
- stateManager.addPipeline(pipeline);
+ HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
+ ClientVersions.CURRENT_VERSION);
+ stateManager.addPipeline(pipelineProto);
nodeManager.addPipeline(pipeline);
List<Pipeline> overlapPipelines = RatisPipelineUtils
.checkPipelineContainSameDatanodes(stateManager, pipeline);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index 9b216a9..efb391e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.pipeline;
+import java.io.File;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -25,6 +26,8 @@ import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
@@ -35,7 +38,11 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.net.NetConstants;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
@@ -46,14 +53,18 @@ import org.apache.hadoop.hdds.scm.net.NodeSchema;
import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.ozone.ClientVersions;
import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
@@ -77,29 +88,49 @@ public class TestPipelinePlacementPolicy {
private NetworkTopologyImpl cluster;
private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10;
private static final int PIPELINE_LOAD_LIMIT = 5;
+ private File testDir;
+ private DBStore dbStore;
+ private SCMHAManager scmhaManager;
private List<DatanodeDetails> nodesWithOutRackAwareness = new ArrayList<>();
private List<DatanodeDetails> nodesWithRackAwareness = new ArrayList<>();
- static final Logger LOG =
- LoggerFactory.getLogger(TestPipelinePlacementPolicy.class);
-
@Before
public void init() throws Exception {
cluster = initTopology();
// start with nodes with rack awareness.
nodeManager = new MockNodeManager(cluster, getNodesWithRackAwareness(),
false, PIPELINE_PLACEMENT_MAX_NODES_COUNT);
- conf = new OzoneConfiguration();
+ conf = SCMTestUtils.getConf();
conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, PIPELINE_LOAD_LIMIT);
conf.setStorageSize(OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN,
10, StorageUnit.MB);
nodeManager.setNumPipelinePerDatanode(PIPELINE_LOAD_LIMIT);
- stateManager = new PipelineStateManager();
+ testDir = GenericTestUtils.getTestDir(
+ TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ dbStore = DBStoreBuilder.createDBStore(
+ conf, new SCMDBDefinition());
+ scmhaManager = MockSCMHAManager.getInstance(true);
+ stateManager = PipelineStateManagerImpl.newBuilder()
+ .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setNodeManager(nodeManager)
+ .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+ .build();
placementPolicy = new PipelinePlacementPolicy(
nodeManager, stateManager, conf);
}
+ @After
+ public void cleanup() throws Exception {
+ if (dbStore != null) {
+ dbStore.close();
+ }
+
+ FileUtil.fullyDelete(testDir);
+ }
+
private NetworkTopologyImpl initTopology() {
NodeSchema[] schemas = new NodeSchema[]
{ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
@@ -152,7 +183,7 @@ public class TestPipelinePlacementPolicy {
}
@Test
- public void testChooseNodeWithSingleNodeRack() throws SCMException {
+ public void testChooseNodeWithSingleNodeRack() throws IOException {
// There is only one node on 3 racks altogether.
List<DatanodeDetails> datanodes = new ArrayList<>();
for (Node node : SINGLE_NODE_RACK) {
@@ -162,8 +193,16 @@ public class TestPipelinePlacementPolicy {
}
MockNodeManager localNodeManager = new MockNodeManager(initTopology(),
datanodes, false, datanodes.size());
+
+ PipelineStateManager tempPipelineStateManager = PipelineStateManagerImpl
+ .newBuilder().setNodeManager(localNodeManager)
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+ .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+ .build();
+
PipelinePlacementPolicy localPlacementPolicy = new PipelinePlacementPolicy(
- localNodeManager, new PipelineStateManager(), conf);
+ localNodeManager, tempPipelineStateManager, conf);
int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();
List<DatanodeDetails> results = localPlacementPolicy.chooseDatanodes(
new ArrayList<>(datanodes.size()),
@@ -181,7 +220,7 @@ public class TestPipelinePlacementPolicy {
}
@Test
- public void testChooseNodeNotEnoughSpace() {
+ public void testChooseNodeNotEnoughSpace() throws IOException {
// There is only one node on 3 racks altogether.
List<DatanodeDetails> datanodes = new ArrayList<>();
for (Node node : SINGLE_NODE_RACK) {
@@ -191,8 +230,16 @@ public class TestPipelinePlacementPolicy {
}
MockNodeManager localNodeManager = new MockNodeManager(initTopology(),
datanodes, false, datanodes.size());
+
+ PipelineStateManager tempPipelineStateManager = PipelineStateManagerImpl
+ .newBuilder().setNodeManager(localNodeManager)
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+ .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+ .build();
+
PipelinePlacementPolicy localPlacementPolicy = new PipelinePlacementPolicy(
- localNodeManager, new PipelineStateManager(), conf);
+ localNodeManager, tempPipelineStateManager, conf);
int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();
String expectedMessageSubstring = "Unable to find enough nodes that meet " +
@@ -237,8 +284,10 @@ public class TestPipelinePlacementPolicy {
ReplicationFactor.THREE))
.setNodes(nodes)
.build();
+ HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
+ ClientVersions.CURRENT_VERSION);
nodeManager.addPipeline(pipeline);
- stateManager.addPipeline(pipeline);
+ stateManager.addPipeline(pipelineProto);
} catch (SCMException e) {
throw e;
//break;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManagerImpl.java
similarity index 64%
rename from hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
rename to hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManagerImpl.java
index 49d969b..aa9c590 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManagerImpl.java
@@ -18,34 +18,78 @@
package org.apache.hadoop.hdds.scm.pipeline;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.ozone.ClientVersions;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
/**
- * Test for PipelineStateManager.
+ * Test for PipelineStateManagerImpl.
*/
-public class TestPipelineStateManager {
+public class TestPipelineStateManagerImpl {
- private StateManager stateManager;
+ private PipelineStateManager stateManager;
+ private File testDir;
+ private DBStore dbStore;
@Before
public void init() throws Exception {
- stateManager = new PipelineStateManager();
+ final OzoneConfiguration conf = SCMTestUtils.getConf();
+ testDir = GenericTestUtils.getTestDir(
+ TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ dbStore = DBStoreBuilder.createDBStore(
+ conf, new SCMDBDefinition());
+
+ SCMHAManager scmhaManager = MockSCMHAManager.getInstance(true);
+ NodeManager nodeManager = new MockNodeManager(true, 10);
+
+ stateManager = PipelineStateManagerImpl.newBuilder()
+ .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setNodeManager(nodeManager)
+ .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+ .build();
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ if (dbStore != null) {
+ dbStore.close();
+ }
+
+ FileUtil.fullyDelete(testDir);
}
private Pipeline createDummyPipeline(int numNodes) {
@@ -70,20 +114,24 @@ public class TestPipelineStateManager {
@Test
public void testAddAndGetPipeline() throws IOException {
Pipeline pipeline = createDummyPipeline(0);
+ HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
+ ClientVersions.CURRENT_VERSION);
try {
- stateManager.addPipeline(pipeline);
+ stateManager.addPipeline(pipelineProto);
Assert.fail("Pipeline should not have been added");
- } catch (IllegalArgumentException e) {
+ } catch (StateMachineException e) {
// replication factor and number of nodes in the pipeline do not match
Assert.assertTrue(e.getMessage().contains("do not match"));
}
// add a pipeline
pipeline = createDummyPipeline(1);
- stateManager.addPipeline(pipeline);
+ pipelineProto = pipeline.getProtobufMessage(
+ ClientVersions.CURRENT_VERSION);
+ stateManager.addPipeline(pipelineProto);
try {
- stateManager.addPipeline(pipeline);
+ stateManager.addPipeline(pipelineProto);
Assert.fail("Pipeline should not have been added");
} catch (IOException e) {
// Can not add a pipeline twice
@@ -92,10 +140,11 @@ public class TestPipelineStateManager {
// verify pipeline returned is same
Pipeline pipeline1 = stateManager.getPipeline(pipeline.getId());
- Assert.assertTrue(pipeline == pipeline1);
+ Assert.assertTrue(pipeline.getId().equals(pipeline1.getId()));
// clean up
- removePipeline(pipeline);
+ finalizePipeline(pipelineProto);
+ removePipeline(pipelineProto);
}
@Test
@@ -103,14 +152,14 @@ public class TestPipelineStateManager {
// In start there should be no pipelines
Assert.assertTrue(stateManager.getPipelines().isEmpty());
- Set<Pipeline> pipelines = new HashSet<>();
- Pipeline pipeline = createDummyPipeline(1);
+ Set<HddsProtos.Pipeline> pipelines = new HashSet<>();
+ HddsProtos.Pipeline pipeline = createDummyPipeline(1).getProtobufMessage(
+ ClientVersions.CURRENT_VERSION);
stateManager.addPipeline(pipeline);
- stateManager.openPipeline(pipeline.getId());
pipelines.add(pipeline);
- pipeline = createDummyPipeline(1);
+ pipeline = createDummyPipeline(1).getProtobufMessage(
+ ClientVersions.CURRENT_VERSION);
stateManager.addPipeline(pipeline);
- stateManager.openPipeline(pipeline.getId());
pipelines.add(pipeline);
Set<Pipeline> pipelines1 = new HashSet<>(stateManager
@@ -121,35 +170,37 @@ public class TestPipelineStateManager {
Assert.assertEquals(pipelines1.size(), pipelines.size());
// clean up
- for (Pipeline pipeline1 : pipelines) {
+ for (HddsProtos.Pipeline pipeline1 : pipelines) {
+ finalizePipeline(pipeline1);
removePipeline(pipeline1);
}
}
@Test
public void testGetPipelinesByTypeAndFactor() throws IOException {
- Set<Pipeline> pipelines = new HashSet<>();
+ Set<HddsProtos.Pipeline> pipelines = new HashSet<>();
for (HddsProtos.ReplicationType type : new ReplicationType[] {
ReplicationType.RATIS, ReplicationType.STAND_ALONE}) {
for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
.values()) {
for (int i = 0; i < 5; i++) {
// 5 pipelines in allocated state for each type and factor
- Pipeline pipeline =
- createDummyPipeline(type, factor, factor.getNumber());
+ HddsProtos.Pipeline pipeline =
+ createDummyPipeline(type, factor, factor.getNumber())
+ .getProtobufMessage(ClientVersions.CURRENT_VERSION);
stateManager.addPipeline(pipeline);
pipelines.add(pipeline);
// 5 pipelines in open state for each type and factor
- pipeline = createDummyPipeline(type, factor, factor.getNumber());
+ pipeline = createDummyPipeline(type, factor, factor.getNumber())
+ .getProtobufMessage(ClientVersions.CURRENT_VERSION);
stateManager.addPipeline(pipeline);
- stateManager.openPipeline(pipeline.getId());
pipelines.add(pipeline);
// 5 pipelines in closed state for each type and factor
- pipeline = createDummyPipeline(type, factor, factor.getNumber());
+ pipeline = createDummyPipeline(type, factor, factor.getNumber())
+ .getProtobufMessage(ClientVersions.CURRENT_VERSION);
stateManager.addPipeline(pipeline);
- stateManager.finalizePipeline(pipeline.getId());
pipelines.add(pipeline);
}
}
@@ -171,42 +222,47 @@ public class TestPipelineStateManager {
}
//clean up
- for (Pipeline pipeline : pipelines) {
+ for (HddsProtos.Pipeline pipeline : pipelines) {
+ finalizePipeline(pipeline);
removePipeline(pipeline);
}
}
@Test
public void testGetPipelinesByTypeFactorAndState() throws IOException {
- Set<Pipeline> pipelines = new HashSet<>();
+ Set<HddsProtos.Pipeline> pipelines = new HashSet<>();
for (HddsProtos.ReplicationType type : new ReplicationType[] {
ReplicationType.RATIS, ReplicationType.STAND_ALONE}) {
for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
.values()) {
for (int i = 0; i < 5; i++) {
// 5 pipelines in allocated state for each type and factor
- Pipeline pipeline =
- createDummyPipeline(type, factor, factor.getNumber());
+ HddsProtos.Pipeline pipeline =
+ createDummyPipeline(type, factor, factor.getNumber())
+ .getProtobufMessage(ClientVersions.CURRENT_VERSION);
stateManager.addPipeline(pipeline);
pipelines.add(pipeline);
// 5 pipelines in open state for each type and factor
- pipeline = createDummyPipeline(type, factor, factor.getNumber());
+ pipeline = createDummyPipeline(type, factor, factor.getNumber())
+ .getProtobufMessage(ClientVersions.CURRENT_VERSION);
stateManager.addPipeline(pipeline);
- stateManager.openPipeline(pipeline.getId());
+ openPipeline(pipeline);
pipelines.add(pipeline);
// 5 pipelines in dormant state for each type and factor
- pipeline = createDummyPipeline(type, factor, factor.getNumber());
+ pipeline = createDummyPipeline(type, factor, factor.getNumber())
+ .getProtobufMessage(ClientVersions.CURRENT_VERSION);
stateManager.addPipeline(pipeline);
- stateManager.openPipeline(pipeline.getId());
- stateManager.deactivatePipeline(pipeline.getId());
+ openPipeline(pipeline);
+ deactivatePipeline(pipeline);
pipelines.add(pipeline);
// 5 pipelines in closed state for each type and factor
- pipeline = createDummyPipeline(type, factor, factor.getNumber());
+ pipeline = createDummyPipeline(type, factor, factor.getNumber())
+ .getProtobufMessage(ClientVersions.CURRENT_VERSION);
stateManager.addPipeline(pipeline);
- stateManager.finalizePipeline(pipeline.getId());
+ finalizePipeline(pipeline);
pipelines.add(pipeline);
}
}
@@ -231,7 +287,8 @@ public class TestPipelineStateManager {
}
//clean up
- for (Pipeline pipeline : pipelines) {
+ for (HddsProtos.Pipeline pipeline : pipelines) {
+ finalizePipeline(pipeline);
removePipeline(pipeline);
}
}
@@ -240,13 +297,15 @@ public class TestPipelineStateManager {
public void testAddAndGetContainer() throws IOException {
long containerID = 0;
Pipeline pipeline = createDummyPipeline(1);
- stateManager.addPipeline(pipeline);
+ HddsProtos.Pipeline pipelineProto = pipeline
+ .getProtobufMessage(ClientVersions.CURRENT_VERSION);
+ stateManager.addPipeline(pipelineProto);
pipeline = stateManager.getPipeline(pipeline.getId());
stateManager.addContainerToPipeline(pipeline.getId(),
ContainerID.valueOf(++containerID));
// move pipeline to open state
- stateManager.openPipeline(pipeline.getId());
+ openPipeline(pipelineProto);
stateManager.addContainerToPipeline(pipeline.getId(),
ContainerID.valueOf(++containerID));
stateManager.addContainerToPipeline(pipeline.getId(),
@@ -257,7 +316,8 @@ public class TestPipelineStateManager {
stateManager.getContainers(pipeline.getId());
Assert.assertEquals(containerIDs.size(), containerID);
- removePipeline(pipeline);
+ finalizePipeline(pipelineProto);
+ removePipeline(pipelineProto);
try {
stateManager.addContainerToPipeline(pipeline.getId(),
ContainerID.valueOf(++containerID));
@@ -271,14 +331,16 @@ public class TestPipelineStateManager {
@Test
public void testRemovePipeline() throws IOException {
Pipeline pipeline = createDummyPipeline(1);
- stateManager.addPipeline(pipeline);
+ HddsProtos.Pipeline pipelineProto = pipeline
+ .getProtobufMessage(ClientVersions.CURRENT_VERSION);
+ stateManager.addPipeline(pipelineProto);
// close the pipeline
- stateManager.openPipeline(pipeline.getId());
+ openPipeline(pipelineProto);
stateManager
.addContainerToPipeline(pipeline.getId(), ContainerID.valueOf(1));
try {
- stateManager.removePipeline(pipeline.getId());
+ removePipeline(pipelineProto);
Assert.fail("Pipeline should not have been removed");
} catch (IOException e) {
// can not remove a pipeline which already has containers
@@ -286,18 +348,20 @@ public class TestPipelineStateManager {
}
// close the pipeline
- stateManager.finalizePipeline(pipeline.getId());
+ finalizePipeline(pipelineProto);
// remove containers and then remove the pipeline
- removePipeline(pipeline);
+ removePipeline(pipelineProto);
}
@Test
public void testRemoveContainer() throws IOException {
long containerID = 1;
Pipeline pipeline = createDummyPipeline(1);
+ HddsProtos.Pipeline pipelineProto = pipeline
+ .getProtobufMessage(ClientVersions.CURRENT_VERSION);
// create an open pipeline in stateMap
- stateManager.addPipeline(pipeline);
- stateManager.openPipeline(pipeline.getId());
+ stateManager.addPipeline(pipelineProto);
+ openPipeline(pipelineProto);
stateManager.addContainerToPipeline(pipeline.getId(),
ContainerID.valueOf(containerID));
@@ -314,7 +378,7 @@ public class TestPipelineStateManager {
Assert.assertEquals(2, stateManager.getContainers(pipeline.getId()).size());
// move pipeline to closing state
- stateManager.finalizePipeline(pipeline.getId());
+ finalizePipeline(pipelineProto);
stateManager.removeContainerFromPipeline(pipeline.getId(),
ContainerID.valueOf(containerID));
@@ -323,57 +387,66 @@ public class TestPipelineStateManager {
Assert.assertEquals(0, stateManager.getContainers(pipeline.getId()).size());
// clean up
- stateManager.removePipeline(pipeline.getId());
+ removePipeline(pipelineProto);
}
@Test
public void testFinalizePipeline() throws IOException {
Pipeline pipeline = createDummyPipeline(1);
- stateManager.addPipeline(pipeline);
+ HddsProtos.Pipeline pipelineProto = pipeline
+ .getProtobufMessage(ClientVersions.CURRENT_VERSION);
+ stateManager.addPipeline(pipelineProto);
// finalize on ALLOCATED pipeline
- stateManager.finalizePipeline(pipeline.getId());
+ finalizePipeline(pipelineProto);
Assert.assertEquals(Pipeline.PipelineState.CLOSED,
stateManager.getPipeline(pipeline.getId()).getPipelineState());
// clean up
- removePipeline(pipeline);
+ removePipeline(pipelineProto);
pipeline = createDummyPipeline(1);
- stateManager.addPipeline(pipeline);
- stateManager.openPipeline(pipeline.getId());
+ pipelineProto = pipeline
+ .getProtobufMessage(ClientVersions.CURRENT_VERSION);
+ stateManager.addPipeline(pipelineProto);
+ openPipeline(pipelineProto);
// finalize on OPEN pipeline
- stateManager.finalizePipeline(pipeline.getId());
+ finalizePipeline(pipelineProto);
Assert.assertEquals(Pipeline.PipelineState.CLOSED,
stateManager.getPipeline(pipeline.getId()).getPipelineState());
// clean up
- removePipeline(pipeline);
+ removePipeline(pipelineProto);
pipeline = createDummyPipeline(1);
- stateManager.addPipeline(pipeline);
- stateManager.openPipeline(pipeline.getId());
- stateManager.finalizePipeline(pipeline.getId());
+ pipelineProto = pipeline
+ .getProtobufMessage(ClientVersions.CURRENT_VERSION);
+ stateManager.addPipeline(pipelineProto);
+ openPipeline(pipelineProto);
+ finalizePipeline(pipelineProto);
// finalize should work on already closed pipeline
- stateManager.finalizePipeline(pipeline.getId());
+ finalizePipeline(pipelineProto);
Assert.assertEquals(Pipeline.PipelineState.CLOSED,
stateManager.getPipeline(pipeline.getId()).getPipelineState());
// clean up
- removePipeline(pipeline);
+ removePipeline(pipelineProto);
}
@Test
public void testOpenPipeline() throws IOException {
Pipeline pipeline = createDummyPipeline(1);
- stateManager.addPipeline(pipeline);
+ HddsProtos.Pipeline pipelineProto = pipeline
+ .getProtobufMessage(ClientVersions.CURRENT_VERSION);
+ stateManager.addPipeline(pipelineProto);
// open on ALLOCATED pipeline
- stateManager.openPipeline(pipeline.getId());
+ openPipeline(pipelineProto);
Assert.assertEquals(Pipeline.PipelineState.OPEN,
stateManager.getPipeline(pipeline.getId()).getPipelineState());
- stateManager.openPipeline(pipeline.getId());
+ openPipeline(pipelineProto);
// open should work on already open pipeline
Assert.assertEquals(Pipeline.PipelineState.OPEN,
stateManager.getPipeline(pipeline.getId()).getPipelineState());
// clean up
- removePipeline(pipeline);
+ finalizePipeline(pipelineProto);
+ removePipeline(pipelineProto);
}
@Test
@@ -381,14 +454,16 @@ public class TestPipelineStateManager {
Pipeline pipeline = createDummyPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE, 3);
// pipeline in allocated state should not be reported
- stateManager.addPipeline(pipeline);
+ HddsProtos.Pipeline pipelineProto = pipeline
+ .getProtobufMessage(ClientVersions.CURRENT_VERSION);
+ stateManager.addPipeline(pipelineProto);
Assert.assertEquals(0, stateManager
.getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
Pipeline.PipelineState.OPEN)
.size());
// pipeline in open state should be reported
- stateManager.openPipeline(pipeline.getId());
+ openPipeline(pipelineProto);
Assert.assertEquals(1, stateManager
.getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
Pipeline.PipelineState.OPEN)
@@ -399,27 +474,47 @@ public class TestPipelineStateManager {
pipeline2 = Pipeline.newBuilder(pipeline2)
.setState(Pipeline.PipelineState.OPEN)
.build();
+ HddsProtos.Pipeline pipelineProto2 = pipeline2
+ .getProtobufMessage(ClientVersions.CURRENT_VERSION);
// pipeline in open state should be reported
- stateManager.addPipeline(pipeline2);
+ stateManager.addPipeline(pipelineProto2);
Assert.assertEquals(2, stateManager
.getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
Pipeline.PipelineState.OPEN)
.size());
// pipeline in closed state should not be reported
- stateManager.finalizePipeline(pipeline2.getId());
+ finalizePipeline(pipelineProto2);
Assert.assertEquals(1, stateManager
.getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
Pipeline.PipelineState.OPEN)
.size());
// clean up
- removePipeline(pipeline);
- removePipeline(pipeline2);
+ finalizePipeline(pipelineProto);
+ removePipeline(pipelineProto);
+ finalizePipeline(pipelineProto2);
+ removePipeline(pipelineProto2);
}
- private void removePipeline(Pipeline pipeline) throws IOException {
- stateManager.finalizePipeline(pipeline.getId());
+ private void removePipeline(HddsProtos.Pipeline pipeline) throws IOException {
stateManager.removePipeline(pipeline.getId());
}
+
+ private void openPipeline(HddsProtos.Pipeline pipeline) throws IOException {
+ stateManager.updatePipelineState(pipeline.getId(),
+ HddsProtos.PipelineState.PIPELINE_OPEN);
+ }
+
+ private void finalizePipeline(HddsProtos.Pipeline pipeline)
+ throws IOException {
+ stateManager.updatePipelineState(pipeline.getId(),
+ HddsProtos.PipelineState.PIPELINE_CLOSED);
+ }
+
+ private void deactivatePipeline(HddsProtos.Pipeline pipeline)
+ throws IOException {
+ stateManager.updatePipelineState(pipeline.getId(),
+ HddsProtos.PipelineState.PIPELINE_DORMANT);
+ }
}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 7eb8977..d1f383c 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdds.scm.pipeline;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -27,17 +29,27 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.ozone.ClientVersions;
+import org.apache.ozone.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.commons.collections.CollectionUtils.intersection;
@@ -58,6 +70,8 @@ public class TestRatisPipelineProvider {
private MockNodeManager nodeManager;
private RatisPipelineProvider provider;
private PipelineStateManager stateManager;
+ private File testDir;
+ private DBStore dbStore;
public void init(int maxPipelinePerNode) throws Exception {
init(maxPipelinePerNode, new OzoneConfiguration());
@@ -65,15 +79,34 @@ public class TestRatisPipelineProvider {
public void init(int maxPipelinePerNode, OzoneConfiguration conf)
throws Exception {
+ testDir = GenericTestUtils.getTestDir(
+ TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ dbStore = DBStoreBuilder.createDBStore(
+ conf, new SCMDBDefinition());
nodeManager = new MockNodeManager(true, 10);
nodeManager.setNumPipelinePerDatanode(maxPipelinePerNode);
+ SCMHAManager scmhaManager = MockSCMHAManager.getInstance(true);
conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
maxPipelinePerNode);
- stateManager = new PipelineStateManager();
+ stateManager = PipelineStateManagerImpl.newBuilder()
+ .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setNodeManager(nodeManager)
+ .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+ .build();
provider = new MockRatisPipelineProvider(nodeManager,
stateManager, conf);
}
+ private void cleanup() throws Exception {
+ if (dbStore != null) {
+ dbStore.close();
+ }
+
+ FileUtil.fullyDelete(testDir);
+ }
+
private static void assertPipelineProperties(
Pipeline pipeline, HddsProtos.ReplicationFactor expectedFactor,
HddsProtos.ReplicationType expectedReplicationType,
@@ -90,10 +123,14 @@ public class TestRatisPipelineProvider {
Pipeline pipeline = provider.create(new RatisReplicationConfig(factor));
assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
Pipeline.PipelineState.ALLOCATED);
- stateManager.addPipeline(pipeline);
+ HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
+ ClientVersions.CURRENT_VERSION);
+ stateManager.addPipeline(pipelineProto);
nodeManager.addPipeline(pipeline);
Pipeline pipeline1 = provider.create(new RatisReplicationConfig(factor));
+ HddsProtos.Pipeline pipelineProto1 = pipeline1.getProtobufMessage(
+ ClientVersions.CURRENT_VERSION);
assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE,
Pipeline.PipelineState.ALLOCATED);
// New pipeline should not overlap with the previous created pipeline
@@ -103,7 +140,7 @@ public class TestRatisPipelineProvider {
if (pipeline.getReplicationConfig().getRequiredNodes() == 3) {
assertNotEquals(pipeline.getNodeSet(), pipeline1.getNodeSet());
}
- stateManager.addPipeline(pipeline1);
+ stateManager.addPipeline(pipelineProto1);
nodeManager.addPipeline(pipeline1);
}
@@ -111,12 +148,14 @@ public class TestRatisPipelineProvider {
public void testCreatePipelineWithFactorThree() throws Exception {
init(1);
createPipelineAndAssertions(HddsProtos.ReplicationFactor.THREE);
+ cleanup();
}
@Test
public void testCreatePipelineWithFactorOne() throws Exception {
init(1);
createPipelineAndAssertions(HddsProtos.ReplicationFactor.ONE);
+ cleanup();
}
private List<DatanodeDetails> createListOfNodes(int nodeCount) {
@@ -134,16 +173,21 @@ public class TestRatisPipelineProvider {
Pipeline pipeline = provider.create(new RatisReplicationConfig(factor));
assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
Pipeline.PipelineState.ALLOCATED);
- stateManager.addPipeline(pipeline);
+ HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
+ ClientVersions.CURRENT_VERSION);
+ stateManager.addPipeline(pipelineProto);
factor = HddsProtos.ReplicationFactor.ONE;
Pipeline pipeline1 = provider.create(new RatisReplicationConfig(factor));
assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE,
Pipeline.PipelineState.ALLOCATED);
- stateManager.addPipeline(pipeline1);
+ HddsProtos.Pipeline pipelineProto1 = pipeline1.getProtobufMessage(
+ ClientVersions.CURRENT_VERSION);
+ stateManager.addPipeline(pipelineProto1);
// With enough pipeline quote on datanodes, they should not share
// the same set of datanodes.
assertNotEquals(pipeline.getNodeSet(), pipeline1.getNodeSet());
+ cleanup();
}
@Test
@@ -161,6 +205,7 @@ public class TestRatisPipelineProvider {
createListOfNodes(factor.getNumber()));
assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
Pipeline.PipelineState.OPEN);
+ cleanup();
}
@Test
@@ -177,6 +222,7 @@ public class TestRatisPipelineProvider {
new RatisReplicationConfig(ReplicationFactor.THREE), healthyNodes);
Assert.assertEquals(pipeline1.getNodeSet(), pipeline2.getNodeSet());
+ cleanup();
}
@Test
@@ -211,8 +257,10 @@ public class TestRatisPipelineProvider {
new RatisReplicationConfig(factor));
assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
Pipeline.PipelineState.ALLOCATED);
+ HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
+ ClientVersions.CURRENT_VERSION);
nodeManager.addPipeline(pipeline);
- stateManager.addPipeline(pipeline);
+ stateManager.addPipeline(pipelineProto);
List<DatanodeDetails> nodes = pipeline.getNodes();
@@ -223,6 +271,7 @@ public class TestRatisPipelineProvider {
assertTrue(
"at least 1 node should have been from members of closed pipelines",
nodes.stream().anyMatch(membersOfClosedPipelines::contains));
+ cleanup();
}
@Test
@@ -257,6 +306,7 @@ public class TestRatisPipelineProvider {
Assert.assertTrue(ex.getMessage().contains(expectedErrorSubstring));
}
}
+ cleanup();
}
private void addPipeline(
@@ -269,8 +319,10 @@ public class TestRatisPipelineProvider {
.setState(open)
.setId(PipelineID.randomId())
.build();
+ HddsProtos.Pipeline pipelineProto = openPipeline.getProtobufMessage(
+ ClientVersions.CURRENT_VERSION);
- stateManager.addPipeline(openPipeline);
+ stateManager.addPipeline(pipelineProto);
nodeManager.addPipeline(openPipeline);
}
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
deleted file mode 100644
index fb13d05..0000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.pipeline;
-
-import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.PIPELINES;
-
-import java.io.IOException;
-import java.math.BigInteger;
-import java.security.cert.X509Certificate;
-
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
-import org.apache.hadoop.hdds.security.x509.certificate.CertInfo;
-import org.apache.hadoop.hdds.utils.TransactionInfo;
-import org.apache.hadoop.hdds.scm.metadata.PipelineCodec;
-import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
-import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
-import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
-import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
-import org.apache.hadoop.hdds.utils.db.Codec;
-import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
-import org.apache.hadoop.hdds.utils.db.DBDefinition;
-import org.apache.hadoop.hdds.utils.db.DBStore;
-import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
-
-/**
- * Test SCM Metadata Store that has ONLY the pipeline table whose key uses the
- * old codec format.
- */
-public class TestSCMStoreImplWithOldPipelineIDKeyFormat
- implements SCMMetadataStore {
-
- private DBStore store;
- private final OzoneConfiguration configuration;
- private Table<PipelineID, Pipeline> pipelineTable;
-
- public TestSCMStoreImplWithOldPipelineIDKeyFormat(
- OzoneConfiguration config) throws IOException {
- this.configuration = config;
- start(configuration);
- }
-
- @Override
- public void start(OzoneConfiguration config)
- throws IOException {
- if (this.store == null) {
- this.store = DBStoreBuilder.createDBStore(config,
- new SCMDBTestDefinition());
- pipelineTable = PIPELINES.getTable(store);
- }
- }
-
- @Override
- public void stop() throws Exception {
- if (store != null) {
- store.close();
- store = null;
- }
- }
-
- @Override
- public DBStore getStore() {
- return null;
- }
-
- @Override
- public Table<Long, DeletedBlocksTransaction> getDeletedBlocksTXTable() {
- return null;
- }
-
- @Override
- public Table<BigInteger, X509Certificate> getValidCertsTable() {
- return null;
- }
-
- @Override
- public Table<BigInteger, X509Certificate> getValidSCMCertsTable() {
- return null;
- }
-
- @Override
- public Table<BigInteger, X509Certificate> getRevokedCertsTable() {
- return null;
- }
-
- @Override
- public Table<BigInteger, CertInfo> getRevokedCertsV2Table() {
- return null;
- }
-
- @Override
- public Table<Long, CRLInfo> getCRLInfoTable() {
- return null;
- }
-
- @Override
- public Table<String, Long> getCRLSequenceIdTable() {
- return null;
- }
-
- @Override
- public Table<String, Long> getSequenceIdTable() {
- return null;
- }
-
- @Override
- public TableIterator getAllCerts(CertificateStore.CertType certType) {
- return null;
- }
-
- @Override
- public Table<PipelineID, Pipeline> getPipelineTable() {
- return pipelineTable;
- }
-
- @Override
- public Table<String, TransactionInfo> getTransactionInfoTable() {
- return null;
- }
-
- @Override
- public BatchOperationHandler getBatchHandler() {
- return null;
- }
-
- @Override
- public Table<ContainerID, ContainerInfo> getContainerTable() {
- return null;
- }
-
- @Override
- public Table<ContainerID, MoveDataNodePair> getMoveTable() {
- return null;
- }
-
- /**
- * Test SCM DB Definition for the above class.
- */
- public static class SCMDBTestDefinition implements DBDefinition {
-
- public static final DBColumnFamilyDefinition<PipelineID, Pipeline>
- PIPELINES =
- new DBColumnFamilyDefinition<>(
- "pipelines",
- PipelineID.class,
- new OldPipelineIDCodec(),
- Pipeline.class,
- new PipelineCodec());
-
- @Override
- public String getName() {
- return "scm.db";
- }
-
- @Override
- public String getLocationConfigKey() {
- return ScmConfigKeys.OZONE_SCM_DB_DIRS;
- }
-
- @Override
- public DBColumnFamilyDefinition[] getColumnFamilies() {
- return new DBColumnFamilyDefinition[] {PIPELINES};
- }
- }
-
- /**
- * Old Pipeline ID codec that relies on protobuf serialization.
- */
- public static class OldPipelineIDCodec implements Codec<PipelineID> {
- @Override
- public byte[] toPersistedFormat(PipelineID object) throws IOException {
- return object.getProtobuf().toByteArray();
- }
-
- @Override
- public PipelineID fromPersistedFormat(byte[] rawData) throws IOException {
- return null;
- }
-
- @Override
- public PipelineID copyObject(PipelineID object) {
- throw new UnsupportedOperationException();
- }
- }
-
-}
-
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
index f21fc00..8fa1509 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
@@ -18,19 +18,34 @@
package org.apache.hadoop.hdds.scm.pipeline;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.ozone.ClientVersions;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
/**
* Test for SimplePipelineProvider.
@@ -40,20 +55,45 @@ public class TestSimplePipelineProvider {
private NodeManager nodeManager;
private PipelineProvider provider;
private PipelineStateManager stateManager;
+ private File testDir;
+ private DBStore dbStore;
@Before
public void init() throws Exception {
nodeManager = new MockNodeManager(true, 10);
- stateManager = new PipelineStateManager();
+ final OzoneConfiguration conf = SCMTestUtils.getConf();
+ testDir = GenericTestUtils.getTestDir(
+ TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ dbStore = DBStoreBuilder.createDBStore(
+ conf, new SCMDBDefinition());
+ SCMHAManager scmhaManager = MockSCMHAManager.getInstance(true);
+ stateManager = PipelineStateManagerImpl.newBuilder()
+ .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setNodeManager(nodeManager)
+ .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+ .build();
provider = new SimplePipelineProvider(nodeManager, stateManager);
}
+ @After
+ public void cleanup() throws Exception {
+ if (dbStore != null) {
+ dbStore.close();
+ }
+
+ FileUtil.fullyDelete(testDir);
+ }
+
@Test
public void testCreatePipelineWithFactor() throws IOException {
HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
Pipeline pipeline =
provider.create(new StandaloneReplicationConfig(factor));
- stateManager.addPipeline(pipeline);
+ HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
+ ClientVersions.CURRENT_VERSION);
+ stateManager.addPipeline(pipelineProto);
Assert.assertEquals(pipeline.getType(),
HddsProtos.ReplicationType.STAND_ALONE);
Assert.assertEquals(pipeline.getReplicationConfig().getRequiredNodes(),
@@ -65,7 +105,9 @@ public class TestSimplePipelineProvider {
factor = HddsProtos.ReplicationFactor.ONE;
Pipeline pipeline1 =
provider.create(new StandaloneReplicationConfig(factor));
- stateManager.addPipeline(pipeline1);
+ HddsProtos.Pipeline pipelineProto1 = pipeline1.getProtobufMessage(
+ ClientVersions.CURRENT_VERSION);
+ stateManager.addPipeline(pipelineProto1);
Assert.assertEquals(pipeline1.getType(),
HddsProtos.ReplicationType.STAND_ALONE);
Assert.assertEquals(
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java
index 8974f98..0cade56 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManagerImpl;
import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineProvider;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.junit.Assert;
@@ -46,7 +46,7 @@ public class TestLeaderChoosePolicy {
public void testDefaultPolicy() {
RatisPipelineProvider ratisPipelineProvider = new RatisPipelineProvider(
mock(NodeManager.class),
- mock(PipelineStateManager.class),
+ mock(PipelineStateManagerImpl.class),
conf,
mock(EventPublisher.class),
SCMContext.emptyContext());
@@ -63,7 +63,7 @@ public class TestLeaderChoosePolicy {
".HelloWorld");
new RatisPipelineProvider(
mock(NodeManager.class),
- mock(PipelineStateManager.class),
+ mock(PipelineStateManagerImpl.class),
conf,
mock(EventPublisher.class),
SCMContext.emptyContext());
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
index 533608d..0094320 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineFactory;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManagerV2Impl;
-import org.apache.hadoop.hdds.scm.pipeline.StateManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManagerImpl;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.ClientVersions;
@@ -55,7 +55,7 @@ public final class ReconPipelineManager extends PipelineManagerImpl {
private ReconPipelineManager(ConfigurationSource conf,
SCMHAManager scmhaManager,
NodeManager nodeManager,
- StateManager pipelineStateManager,
+ PipelineStateManager pipelineStateManager,
PipelineFactory pipelineFactory,
EventPublisher eventPublisher,
SCMContext scmContext) {
@@ -71,8 +71,8 @@ public final class ReconPipelineManager extends PipelineManagerImpl {
SCMHAManager scmhaManager,
SCMContext scmContext) throws IOException {
- // Create PipelineStateManager
- StateManager stateManager = PipelineStateManagerV2Impl
+ // Create PipelineStateManagerImpl
+ PipelineStateManager stateManager = PipelineStateManagerImpl
.newBuilder()
.setPipelineStore(pipelineStore)
.setNodeManager(nodeManager)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org