You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2019/02/22 20:21:06 UTC
[hadoop] branch trunk updated: HDDS-1148. After allocating
container, we are not adding to container DB.
This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7057980 HDDS-1148. After allocating container, we are not adding to container DB.
7057980 is described below
commit 70579805c97c0affb22b036ce8d2795007cdb6dc
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Fri Feb 22 12:20:49 2019 -0800
HDDS-1148. After allocating container, we are not adding to container DB.
---
.../hdds/scm/container/ContainerStateManager.java | 84 +++------------
.../hdds/scm/container/SCMContainerManager.java | 120 +++++++++++++++++----
2 files changed, 113 insertions(+), 91 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index c182d0a..423ac78 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -121,7 +121,6 @@ public class ContainerStateManager {
private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap;
private final ContainerStateMap containers;
private final AtomicLong containerCount;
- private final int numContainerPerOwnerInPipeline;
/**
* Constructs a Container State Manager that tracks all containers owned by
@@ -152,9 +151,6 @@ public class ContainerStateManager {
this.lastUsedMap = new ConcurrentHashMap<>();
this.containerCount = new AtomicLong(0);
this.containers = new ContainerStateMap();
- this.numContainerPerOwnerInPipeline = configuration
- .getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
- ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
}
/*
@@ -362,55 +358,6 @@ public class ContainerStateManager {
});
}
- /**
- * Return a container matching the attributes specified.
- *
- * @param size - Space needed in the Container.
- * @param owner - Owner of the container - A specific nameservice.
- * @param pipelineManager - Pipeline Manager
- * @param pipeline - Pipeline from which container needs to be matched
- * @return ContainerInfo, null if there is no match found.
- */
- ContainerInfo getMatchingContainer(final long size, String owner,
- PipelineManager pipelineManager, Pipeline pipeline) throws IOException {
-
- NavigableSet<ContainerID> containerIDs =
- pipelineManager.getContainersInPipeline(pipeline.getId());
- if (containerIDs == null) {
- LOG.error("Container list is null for pipeline=", pipeline.getId());
- return null;
- }
-
- getContainers(containerIDs, owner);
- if (containerIDs.size() < numContainerPerOwnerInPipeline) {
- synchronized (pipeline) {
- // TODO: #CLUTIL Maybe we can add selection logic inside synchronized
- // as well
- containerIDs = getContainers(
- pipelineManager.getContainersInPipeline(pipeline.getId()), owner);
- if (containerIDs.size() < numContainerPerOwnerInPipeline) {
- ContainerInfo containerInfo =
- allocateContainer(pipelineManager, owner, pipeline);
- lastUsedMap.put(new ContainerState(owner, pipeline.getId()),
- containerInfo.containerID());
- return containerInfo;
- }
- }
- }
-
- ContainerInfo containerInfo =
- getMatchingContainer(size, owner, pipeline.getId(), containerIDs);
- if (containerInfo == null) {
- synchronized (pipeline) {
- containerInfo =
- allocateContainer(pipelineManager, owner, pipeline);
- lastUsedMap.put(new ContainerState(owner, pipeline.getId()),
- containerInfo.containerID());
- }
- }
- // TODO: #CLUTIL cleanup entries in lastUsedMap
- return containerInfo;
- }
/**
* Return a container matching the attributes specified.
@@ -469,9 +416,6 @@ public class ContainerStateManager {
final ContainerInfo containerInfo = containers.getContainerInfo(id);
if (containerInfo.getUsedBytes() + size <= this.containerSize) {
containerInfo.updateLastUsedTime();
-
- final ContainerState key = new ContainerState(owner, pipelineID);
- lastUsedMap.put(key, containerInfo.containerID());
return containerInfo;
}
}
@@ -523,21 +467,7 @@ public class ContainerStateManager {
return containers.getContainerInfo(containerID);
}
- private NavigableSet<ContainerID> getContainers(
- NavigableSet<ContainerID> containerIDs, String owner) {
- for (ContainerID cid : containerIDs) {
- try {
- if (!getContainer(cid).getOwner().equals(owner)) {
- containerIDs.remove(cid);
- }
- } catch (ContainerNotFoundException e) {
- LOG.error("Could not find container info for container id={} {}", cid,
- e);
- containerIDs.remove(cid);
- }
- }
- return containerIDs;
- }
+
void close() throws IOException {
}
@@ -583,4 +513,16 @@ public class ContainerStateManager {
containers.removeContainer(containerID);
}
+ /**
+ * Update the lastUsedmap to update with ContainerState and containerID.
+ * @param pipelineID
+ * @param containerID
+ * @param owner
+ */
+ public synchronized void updateLastUsedMap(PipelineID pipelineID,
+ ContainerID containerID, String owner) {
+ lastUsedMap.put(new ContainerState(owner, pipelineID),
+ containerID);
+ }
+
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
index 93abdcc..96c54f3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -22,6 +22,7 @@ import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -44,6 +45,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.locks.Lock;
@@ -67,6 +69,7 @@ public class SCMContainerManager implements ContainerManager {
private final MetadataStore containerStore;
private final PipelineManager pipelineManager;
private final ContainerStateManager containerStateManager;
+ private final int numContainerPerOwnerInPipeline;
/**
* Constructs a mapping class that creates mapping between container names
@@ -100,6 +103,9 @@ public class SCMContainerManager implements ContainerManager {
this.lock = new ReentrantLock();
this.pipelineManager = pipelineManager;
this.containerStateManager = new ContainerStateManager(conf);
+ this.numContainerPerOwnerInPipeline = conf
+ .getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
+ ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
loadExistingContainers();
}
@@ -201,6 +207,7 @@ public class SCMContainerManager implements ContainerManager {
}
}
+
/**
* Allocates a new container.
*
@@ -215,23 +222,11 @@ public class SCMContainerManager implements ContainerManager {
throws IOException {
lock.lock();
try {
- final ContainerInfo containerInfo; containerInfo = containerStateManager
- .allocateContainer(pipelineManager, type, replicationFactor, owner);
- try {
- final byte[] containerIDBytes = Longs.toByteArray(
- containerInfo.getContainerID());
- containerStore.put(containerIDBytes,
- containerInfo.getProtobuf().toByteArray());
- } catch (IOException ex) {
- // If adding to containerStore fails, we should remove the container
- // from in-memory map.
- try {
- containerStateManager.removeContainer(containerInfo.containerID());
- } catch (ContainerNotFoundException cnfe) {
- // No need to worry much, everything is going as planned.
- }
- throw ex;
- }
+ final ContainerInfo containerInfo =
+ containerStateManager.allocateContainer(pipelineManager, type,
+ replicationFactor, owner);
+ // Add container to DB.
+ addContainerToDB(containerInfo);
return containerInfo;
} finally {
lock.unlock();
@@ -360,9 +355,45 @@ public class SCMContainerManager implements ContainerManager {
String owner, Pipeline pipeline) {
try {
//TODO: #CLUTIL See if lock is required here
- return containerStateManager
- .getMatchingContainer(sizeRequired, owner, pipelineManager,
- pipeline);
+ NavigableSet<ContainerID> containerIDs =
+ pipelineManager.getContainersInPipeline(pipeline.getId());
+
+ containerIDs = getContainersForOwner(containerIDs, owner);
+ if (containerIDs.size() < numContainerPerOwnerInPipeline) {
+ synchronized (pipeline) {
+ // TODO: #CLUTIL Maybe we can add selection logic inside synchronized
+ // as well
+ containerIDs = getContainersForOwner(
+ pipelineManager.getContainersInPipeline(pipeline.getId()), owner);
+ if (containerIDs.size() < numContainerPerOwnerInPipeline) {
+ ContainerInfo containerInfo =
+ containerStateManager.allocateContainer(pipelineManager, owner,
+ pipeline);
+ // Add to DB
+ addContainerToDB(containerInfo);
+ containerStateManager.updateLastUsedMap(pipeline.getId(),
+ containerInfo.containerID(), owner);
+ return containerInfo;
+ }
+ }
+ }
+
+ ContainerInfo containerInfo =
+ containerStateManager.getMatchingContainer(sizeRequired, owner,
+ pipeline.getId(), containerIDs);
+ if (containerInfo == null) {
+ synchronized (pipeline) {
+ containerInfo =
+ containerStateManager.allocateContainer(pipelineManager, owner,
+ pipeline);
+ // Add to DB
+ addContainerToDB(containerInfo);
+ }
+ }
+ containerStateManager.updateLastUsedMap(pipeline.getId(),
+ containerInfo.containerID(), owner);
+ // TODO: #CLUTIL cleanup entries in lastUsedMap
+ return containerInfo;
} catch (Exception e) {
LOG.warn("Container allocation failed for pipeline={} requiredSize={} {}",
pipeline, sizeRequired, e);
@@ -371,6 +402,55 @@ public class SCMContainerManager implements ContainerManager {
}
/**
+ * Add newly allocated container to container DB.
+ * @param containerInfo
+ * @throws IOException
+ */
+ private void addContainerToDB(ContainerInfo containerInfo)
+ throws IOException {
+ try {
+ final byte[] containerIDBytes = Longs.toByteArray(
+ containerInfo.getContainerID());
+ containerStore.put(containerIDBytes,
+ containerInfo.getProtobuf().toByteArray());
+ } catch (IOException ex) {
+ // If adding to containerStore fails, we should remove the container
+ // from in-memory map.
+ try {
+ containerStateManager.removeContainer(containerInfo.containerID());
+ } catch (ContainerNotFoundException cnfe) {
+ // This should not happen, as we are removing after adding in to
+ // container state cmap.
+ }
+ throw ex;
+ }
+ }
+
+ /**
+ * Returns the container ID's matching with specified owner.
+ * @param containerIDs
+ * @param owner
+ * @return NavigableSet<ContainerID>
+ */
+ private NavigableSet<ContainerID> getContainersForOwner(
+ NavigableSet<ContainerID> containerIDs, String owner) {
+ for (ContainerID cid : containerIDs) {
+ try {
+ if (!getContainer(cid).getOwner().equals(owner)) {
+ containerIDs.remove(cid);
+ }
+ } catch (ContainerNotFoundException e) {
+ LOG.error("Could not find container info for container id={} {}", cid,
+ e);
+ containerIDs.remove(cid);
+ }
+ }
+ return containerIDs;
+ }
+
+
+
+ /**
* Returns the latest list of DataNodes where replica for given containerId
* exist. Throws an SCMException if no entry is found for given containerId.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org