You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/10/10 20:44:54 UTC
[2/2] git commit: YARN-2494. Added NodeLabels Manager internal API
and implementation. Contributed by Wangda Tan.
YARN-2494. Added NodeLabels Manager internal API and implementation. Contributed by Wangda Tan.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/db7f1653
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/db7f1653
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/db7f1653
Branch: refs/heads/trunk
Commit: db7f1653198b950e89567c06898d64f6b930a0ee
Parents: cb81bac
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Fri Oct 10 11:44:21 2014 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Fri Oct 10 11:44:21 2014 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop/yarn/conf/YarnConfiguration.java | 11 +
.../nodelabels/CommonNodeLabelsManager.java | 722 +++++++++++++++++++
.../nodelabels/FileSystemNodeLabelsStore.java | 255 +++++++
.../hadoop/yarn/nodelabels/NodeLabelsStore.java | 69 ++
.../nodelabels/event/NodeLabelsStoreEvent.java | 28 +
.../event/NodeLabelsStoreEventType.java | 25 +
.../event/RemoveClusterNodeLabels.java | 34 +
.../event/StoreNewClusterNodeLabels.java | 34 +
.../event/UpdateNodeToLabelsMappingsEvent.java | 37 +
.../DummyCommonNodeLabelsManager.java | 81 +++
.../yarn/nodelabels/NodeLabelTestBase.java | 76 ++
.../nodelabels/TestCommonNodeLabelsManager.java | 261 +++++++
.../TestFileSystemNodeLabelsStore.java | 252 +++++++
.../nodelabels/RMNodeLabelsManager.java | 447 ++++++++++++
.../nodelabels/DummyRMNodeLabelsManager.java | 83 +++
.../nodelabels/TestRMNodeLabelsManager.java | 367 ++++++++++
17 files changed, 2785 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db7f1653/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 210297a..2a544ba 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -153,6 +153,9 @@ Release 2.6.0 - UNRELEASED
YARN-2544. Added admin-API objects for using node-labels. (Wangda Tan via
vinodkv)
+ YARN-2494. Added NodeLabels Manager internal API and implementation. (Wangda
+ Tan via vinodkv)
+
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db7f1653/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index f183a90..dfb7a2c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1447,6 +1447,17 @@ public class YarnConfiguration extends Configuration {
public static final String YARN_HTTP_POLICY_KEY = YARN_PREFIX + "http.policy";
public static final String YARN_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY
.name();
+
+ public static final String NODE_LABELS_PREFIX = YARN_PREFIX + "node-labels.";
+
+ /** URI for NodeLabelManager */
+ public static final String FS_NODE_LABELS_STORE_URI = NODE_LABELS_PREFIX
+ + "fs-store.uri";
+ public static final String DEFAULT_FS_NODE_LABELS_STORE_URI = "file:///tmp/";
+ public static final String FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC =
+ NODE_LABELS_PREFIX + "fs-store.retry-policy-spec";
+ public static final String DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC =
+ "2000, 500";
public YarnConfiguration() {
super();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db7f1653/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
new file mode 100644
index 0000000..89fbf09
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
@@ -0,0 +1,722 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.nodelabels.event.StoreNewClusterNodeLabels;
+import org.apache.hadoop.yarn.nodelabels.event.NodeLabelsStoreEvent;
+import org.apache.hadoop.yarn.nodelabels.event.NodeLabelsStoreEventType;
+import org.apache.hadoop.yarn.nodelabels.event.RemoveClusterNodeLabels;
+import org.apache.hadoop.yarn.nodelabels.event.UpdateNodeToLabelsMappingsEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.collect.ImmutableSet;
+
+public class CommonNodeLabelsManager extends AbstractService {
+ protected static final Log LOG = LogFactory.getLog(CommonNodeLabelsManager.class);
+ private static final int MAX_LABEL_LENGTH = 255;
+ public static final Set<String> EMPTY_STRING_SET = Collections
+ .unmodifiableSet(new HashSet<String>(0));
+ public static final String ANY = "*";
+ public static final Set<String> ACCESS_ANY_LABEL_SET = ImmutableSet.of(ANY);
+ private static final Pattern LABEL_PATTERN = Pattern
+ .compile("^[0-9a-zA-Z][0-9a-zA-Z-_]*");
+ public static final int WILDCARD_PORT = 0;
+
+ /**
+ * If a user doesn't specify label of a queue or node, it belongs
+ * DEFAULT_LABEL
+ */
+ public static final String NO_LABEL = "";
+
+ protected Dispatcher dispatcher;
+
+ protected ConcurrentMap<String, Label> labelCollections =
+ new ConcurrentHashMap<String, Label>();
+ protected ConcurrentMap<String, Host> nodeCollections =
+ new ConcurrentHashMap<String, Host>();
+
+ protected final ReadLock readLock;
+ protected final WriteLock writeLock;
+
+ protected NodeLabelsStore store;
+
+ protected static class Label {
+ public Resource resource;
+
+ protected Label() {
+ this.resource = Resource.newInstance(0, 0);
+ }
+ }
+
+ /**
+ * A <code>Host</code> can have multiple <code>Node</code>s
+ */
+ protected static class Host {
+ public Set<String> labels;
+ public Map<NodeId, Node> nms;
+
+ protected Host() {
+ labels =
+ Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+ nms = new ConcurrentHashMap<NodeId, Node>();
+ }
+
+ public Host copy() {
+ Host c = new Host();
+ c.labels = new HashSet<String>(labels);
+ for (Entry<NodeId, Node> entry : nms.entrySet()) {
+ c.nms.put(entry.getKey(), entry.getValue().copy());
+ }
+ return c;
+ }
+ }
+
+ protected static class Node {
+ public Set<String> labels;
+ public Resource resource;
+ public boolean running;
+
+ protected Node() {
+ labels = null;
+ resource = Resource.newInstance(0, 0);
+ running = false;
+ }
+
+ public Node copy() {
+ Node c = new Node();
+ if (labels != null) {
+ c.labels =
+ Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+ } else {
+ c.labels = null;
+ }
+ c.resource = Resources.clone(resource);
+ c.running = running;
+ return c;
+ }
+ }
+
+ private final class ForwardingEventHandler implements
+ EventHandler<NodeLabelsStoreEvent> {
+
+ @Override
+ public void handle(NodeLabelsStoreEvent event) {
+ if (isInState(STATE.STARTED)) {
+ handleStoreEvent(event);
+ }
+ }
+ }
+
+ // Dispatcher related code
+ protected void handleStoreEvent(NodeLabelsStoreEvent event) {
+ try {
+ switch (event.getType()) {
+ case ADD_LABELS:
+ StoreNewClusterNodeLabels storeNewClusterNodeLabelsEvent =
+ (StoreNewClusterNodeLabels) event;
+ store.storeNewClusterNodeLabels(storeNewClusterNodeLabelsEvent
+ .getLabels());
+ break;
+ case REMOVE_LABELS:
+ RemoveClusterNodeLabels removeClusterNodeLabelsEvent =
+ (RemoveClusterNodeLabels) event;
+ store.removeClusterNodeLabels(removeClusterNodeLabelsEvent.getLabels());
+ break;
+ case STORE_NODE_TO_LABELS:
+ UpdateNodeToLabelsMappingsEvent updateNodeToLabelsMappingsEvent =
+ (UpdateNodeToLabelsMappingsEvent) event;
+ store.updateNodeToLabelsMappings(updateNodeToLabelsMappingsEvent
+ .getNodeToLabels());
+ break;
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to store label modification to storage");
+ throw new YarnRuntimeException(e);
+ }
+ }
+
+ public CommonNodeLabelsManager() {
+ super(CommonNodeLabelsManager.class.getName());
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ readLock = lock.readLock();
+ writeLock = lock.writeLock();
+ }
+
+ // for UT purpose
+ protected void initDispatcher(Configuration conf) {
+ // create async handler
+ dispatcher = new AsyncDispatcher();
+ AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
+ asyncDispatcher.init(conf);
+ asyncDispatcher.setDrainEventsOnStop();
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ initNodeLabelStore(conf);
+
+ labelCollections.put(NO_LABEL, new Label());
+ }
+
+ protected void initNodeLabelStore(Configuration conf) throws Exception {
+ this.store = new FileSystemNodeLabelsStore(this);
+ this.store.init(conf);
+ this.store.recover();
+ }
+
+ // for UT purpose
+ protected void startDispatcher() {
+ // start dispatcher
+ AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
+ asyncDispatcher.start();
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ // init dispatcher only when service start, because recover will happen in
+ // service init, we don't want to trigger any event handling at that time.
+ initDispatcher(getConfig());
+
+ dispatcher.register(NodeLabelsStoreEventType.class,
+ new ForwardingEventHandler());
+
+ startDispatcher();
+ }
+
+ // for UT purpose
+ protected void stopDispatcher() {
+ AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
+ asyncDispatcher.stop();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ // finalize store
+ stopDispatcher();
+ store.close();
+ }
+
+ /**
+ * Add multiple node labels to repository
+ *
+ * @param labels
+ * new node labels added
+ */
+ @SuppressWarnings("unchecked")
+ public void addToCluserNodeLabels(Set<String> labels) throws IOException {
+ if (null == labels || labels.isEmpty()) {
+ return;
+ }
+
+ labels = normalizeLabels(labels);
+
+ // do a check before actual adding them, will throw exception if any of them
+ // doesn't meet label name requirement
+ for (String label : labels) {
+ checkAndThrowLabelName(label);
+ }
+
+ for (String label : labels) {
+ this.labelCollections.put(label, new Label());
+ }
+ if (null != dispatcher) {
+ dispatcher.getEventHandler().handle(
+ new StoreNewClusterNodeLabels(labels));
+ }
+
+ LOG.info("Add labels: [" + StringUtils.join(labels.iterator(), ",") + "]");
+ }
+
+ protected void checkAddLabelsToNode(
+ Map<NodeId, Set<String>> addedLabelsToNode) throws IOException {
+ if (null == addedLabelsToNode || addedLabelsToNode.isEmpty()) {
+ return;
+ }
+
+ // check all labels being added existed
+ Set<String> knownLabels = labelCollections.keySet();
+ for (Entry<NodeId, Set<String>> entry : addedLabelsToNode.entrySet()) {
+ if (!knownLabels.containsAll(entry.getValue())) {
+ String msg =
+ "Not all labels being added contained by known "
+ + "label collections, please check" + ", added labels=["
+ + StringUtils.join(entry.getValue(), ",") + "]";
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void internalAddLabelsToNode(
+ Map<NodeId, Set<String>> addedLabelsToNode) throws IOException {
+ // do add labels to nodes
+ Map<NodeId, Set<String>> newNMToLabels =
+ new HashMap<NodeId, Set<String>>();
+ for (Entry<NodeId, Set<String>> entry : addedLabelsToNode.entrySet()) {
+ NodeId nodeId = entry.getKey();
+ Set<String> labels = entry.getValue();
+
+ createNodeIfNonExisted(entry.getKey());
+
+ if (nodeId.getPort() == WILDCARD_PORT) {
+ Host host = nodeCollections.get(nodeId.getHost());
+ host.labels.addAll(labels);
+ newNMToLabels.put(nodeId, host.labels);
+ } else {
+ Node nm = getNMInNodeSet(nodeId);
+ if (nm.labels == null) {
+ nm.labels = new HashSet<String>();
+ }
+ nm.labels.addAll(labels);
+ newNMToLabels.put(nodeId, nm.labels);
+ }
+ }
+
+ if (null != dispatcher) {
+ dispatcher.getEventHandler().handle(
+ new UpdateNodeToLabelsMappingsEvent(newNMToLabels));
+ }
+
+ // shows node->labels we added
+ LOG.info("addLabelsToNode:");
+ for (Entry<NodeId, Set<String>> entry : newNMToLabels.entrySet()) {
+ LOG.info(" NM=" + entry.getKey() + ", labels=["
+ + StringUtils.join(entry.getValue().iterator(), ",") + "]");
+ }
+ }
+
+ /**
+ * add more labels to nodes
+ *
+ * @param addedLabelsToNode node -> labels map
+ */
+ public void addLabelsToNode(Map<NodeId, Set<String>> addedLabelsToNode)
+ throws IOException {
+ checkAddLabelsToNode(addedLabelsToNode);
+ internalAddLabelsToNode(addedLabelsToNode);
+ }
+
+ protected void checkRemoveFromClusterNodeLabels(
+ Collection<String> labelsToRemove) throws IOException {
+ if (null == labelsToRemove || labelsToRemove.isEmpty()) {
+ return;
+ }
+
+ // Check if label to remove doesn't existed or null/empty, will throw
+ // exception if any of labels to remove doesn't meet requirement
+ for (String label : labelsToRemove) {
+ label = normalizeLabel(label);
+ if (label == null || label.isEmpty()) {
+ throw new IOException("Label to be removed is null or empty");
+ }
+
+ if (!labelCollections.containsKey(label)) {
+ throw new IOException("Node label=" + label
+ + " to be removed doesn't existed in cluster "
+ + "node labels collection.");
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void internalRemoveFromClusterNodeLabels(Collection<String> labelsToRemove) {
+ // remove labels from nodes
+ for (String nodeName : nodeCollections.keySet()) {
+ Host host = nodeCollections.get(nodeName);
+ if (null != host) {
+ host.labels.removeAll(labelsToRemove);
+ for (Node nm : host.nms.values()) {
+ if (nm.labels != null) {
+ nm.labels.removeAll(labelsToRemove);
+ }
+ }
+ }
+ }
+
+ // remove labels from node labels collection
+ for (String label : labelsToRemove) {
+ labelCollections.remove(label);
+ }
+
+ // create event to remove labels
+ if (null != dispatcher) {
+ dispatcher.getEventHandler().handle(
+ new RemoveClusterNodeLabels(labelsToRemove));
+ }
+
+ LOG.info("Remove labels: ["
+ + StringUtils.join(labelsToRemove.iterator(), ",") + "]");
+ }
+
+ /**
+ * Remove multiple node labels from repository
+ *
+ * @param labelsToRemove
+ * node labels to remove
+ * @throws IOException
+ */
+ public void removeFromClusterNodeLabels(Collection<String> labelsToRemove)
+ throws IOException {
+ checkRemoveFromClusterNodeLabels(labelsToRemove);
+
+ internalRemoveFromClusterNodeLabels(labelsToRemove);
+ }
+
+ protected void checkRemoveLabelsFromNode(
+ Map<NodeId, Set<String>> removeLabelsFromNode) throws IOException {
+ // check all labels being added existed
+ Set<String> knownLabels = labelCollections.keySet();
+ for (Entry<NodeId, Set<String>> entry : removeLabelsFromNode.entrySet()) {
+ NodeId nodeId = entry.getKey();
+ Set<String> labels = entry.getValue();
+
+ if (!knownLabels.containsAll(labels)) {
+ String msg =
+ "Not all labels being removed contained by known "
+ + "label collections, please check" + ", removed labels=["
+ + StringUtils.join(labels, ",") + "]";
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+
+ Set<String> originalLabels = null;
+
+ boolean nodeExisted = false;
+ if (WILDCARD_PORT != nodeId.getPort()) {
+ Node nm = getNMInNodeSet(nodeId);
+ if (nm != null) {
+ originalLabels = nm.labels;
+ nodeExisted = true;
+ }
+ } else {
+ Host host = nodeCollections.get(nodeId.getHost());
+ if (null != host) {
+ originalLabels = host.labels;
+ nodeExisted = true;
+ }
+ }
+
+ if (!nodeExisted) {
+ String msg =
+ "Try to remove labels from NM=" + nodeId
+ + ", but the NM doesn't existed";
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+
+ if (labels == null || labels.isEmpty()) {
+ continue;
+ }
+
+ if (!originalLabels.containsAll(labels)) {
+ String msg =
+ "Try to remove labels = [" + StringUtils.join(labels, ",")
+ + "], but not all labels contained by NM=" + nodeId;
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void internalRemoveLabelsFromNode(
+ Map<NodeId, Set<String>> removeLabelsFromNode) {
+ // do remove labels from nodes
+ Map<NodeId, Set<String>> newNMToLabels =
+ new HashMap<NodeId, Set<String>>();
+ for (Entry<NodeId, Set<String>> entry : removeLabelsFromNode.entrySet()) {
+ NodeId nodeId = entry.getKey();
+ Set<String> labels = entry.getValue();
+
+ if (nodeId.getPort() == WILDCARD_PORT) {
+ Host host = nodeCollections.get(nodeId.getHost());
+ host.labels.removeAll(labels);
+ newNMToLabels.put(nodeId, host.labels);
+ } else {
+ Node nm = getNMInNodeSet(nodeId);
+ if (nm.labels != null) {
+ nm.labels.removeAll(labels);
+ newNMToLabels.put(nodeId, nm.labels);
+ }
+ }
+ }
+
+ if (null != dispatcher) {
+ dispatcher.getEventHandler().handle(
+ new UpdateNodeToLabelsMappingsEvent(newNMToLabels));
+ }
+
+ // shows node->labels we added
+ LOG.info("removeLabelsFromNode:");
+ for (Entry<NodeId, Set<String>> entry : newNMToLabels.entrySet()) {
+ LOG.info(" NM=" + entry.getKey() + ", labels=["
+ + StringUtils.join(entry.getValue().iterator(), ",") + "]");
+ }
+ }
+
+ /**
+ * remove labels from nodes, labels being removed most be contained by these
+ * nodes
+ *
+ * @param removeLabelsFromNode node -> labels map
+ */
+ public void
+ removeLabelsFromNode(Map<NodeId, Set<String>> removeLabelsFromNode)
+ throws IOException {
+ checkRemoveLabelsFromNode(removeLabelsFromNode);
+
+ internalRemoveLabelsFromNode(removeLabelsFromNode);
+ }
+
+ protected void checkReplaceLabelsOnNode(
+ Map<NodeId, Set<String>> replaceLabelsToNode) throws IOException {
+ if (null == replaceLabelsToNode || replaceLabelsToNode.isEmpty()) {
+ return;
+ }
+
+ // check all labels being added existed
+ Set<String> knownLabels = labelCollections.keySet();
+ for (Entry<NodeId, Set<String>> entry : replaceLabelsToNode.entrySet()) {
+ if (!knownLabels.containsAll(entry.getValue())) {
+ String msg =
+ "Not all labels being replaced contained by known "
+ + "label collections, please check" + ", new labels=["
+ + StringUtils.join(entry.getValue(), ",") + "]";
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void internalReplaceLabelsOnNode(
+ Map<NodeId, Set<String>> replaceLabelsToNode) {
+ // do replace labels to nodes
+ Map<NodeId, Set<String>> newNMToLabels = new HashMap<NodeId, Set<String>>();
+ for (Entry<NodeId, Set<String>> entry : replaceLabelsToNode.entrySet()) {
+ NodeId nodeId = entry.getKey();
+ Set<String> labels = entry.getValue();
+
+ // update nodeCollections
+ createNodeIfNonExisted(entry.getKey());
+ if (nodeId.getPort() == WILDCARD_PORT) {
+ Host host = nodeCollections.get(nodeId.getHost());
+ host.labels.clear();
+ host.labels.addAll(labels);
+ newNMToLabels.put(nodeId, host.labels);
+ } else {
+ Node nm = getNMInNodeSet(nodeId);
+ if (nm.labels == null) {
+ nm.labels = new HashSet<String>();
+ }
+ nm.labels.clear();
+ nm.labels.addAll(labels);
+ newNMToLabels.put(nodeId, nm.labels);
+ }
+ }
+
+ if (null != dispatcher) {
+ dispatcher.getEventHandler().handle(
+ new UpdateNodeToLabelsMappingsEvent(newNMToLabels));
+ }
+
+ // shows node->labels we added
+ LOG.info("setLabelsToNode:");
+ for (Entry<NodeId, Set<String>> entry : newNMToLabels.entrySet()) {
+ LOG.info(" NM=" + entry.getKey() + ", labels=["
+ + StringUtils.join(entry.getValue().iterator(), ",") + "]");
+ }
+ }
+
+ /**
+ * replace labels to nodes
+ *
+ * @param replaceLabelsToNode node -> labels map
+ */
+ public void replaceLabelsOnNode(Map<NodeId, Set<String>> replaceLabelsToNode)
+ throws IOException {
+ checkReplaceLabelsOnNode(replaceLabelsToNode);
+
+ internalReplaceLabelsOnNode(replaceLabelsToNode);
+ }
+
+ /**
+ * Get mapping of nodes to labels
+ *
+ * @return nodes to labels map
+ */
+ public Map<NodeId, Set<String>> getNodeLabels() {
+ try {
+ readLock.lock();
+ Map<NodeId, Set<String>> nodeToLabels =
+ new HashMap<NodeId, Set<String>>();
+ for (Entry<String, Host> entry : nodeCollections.entrySet()) {
+ String hostName = entry.getKey();
+ Host host = entry.getValue();
+ for (NodeId nodeId : host.nms.keySet()) {
+ Set<String> nodeLabels = getLabelsByNode(nodeId);
+ if (nodeLabels == null || nodeLabels.isEmpty()) {
+ continue;
+ }
+ nodeToLabels.put(nodeId, nodeLabels);
+ }
+ if (!host.labels.isEmpty()) {
+ nodeToLabels
+ .put(NodeId.newInstance(hostName, WILDCARD_PORT), host.labels);
+ }
+ }
+ return Collections.unmodifiableMap(nodeToLabels);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Get existing valid labels in repository
+ *
+ * @return existing valid labels in repository
+ */
+ public Set<String> getClusterNodeLabels() {
+ try {
+ readLock.lock();
+ Set<String> labels = new HashSet<String>(labelCollections.keySet());
+ labels.remove(NO_LABEL);
+ return Collections.unmodifiableSet(labels);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private void checkAndThrowLabelName(String label) throws IOException {
+ if (label == null || label.isEmpty() || label.length() > MAX_LABEL_LENGTH) {
+ throw new IOException("label added is empty or exceeds "
+ + MAX_LABEL_LENGTH + " character(s)");
+ }
+ label = label.trim();
+
+ boolean match = LABEL_PATTERN.matcher(label).matches();
+
+ if (!match) {
+ throw new IOException("label name should only contains "
+ + "{0-9, a-z, A-Z, -, _} and should not started with {-,_}"
+ + ", now it is=" + label);
+ }
+ }
+
+ protected String normalizeLabel(String label) {
+ if (label != null) {
+ return label.trim();
+ }
+ return NO_LABEL;
+ }
+
+ private Set<String> normalizeLabels(Set<String> labels) {
+ Set<String> newLabels = new HashSet<String>();
+ for (String label : labels) {
+ newLabels.add(normalizeLabel(label));
+ }
+ return newLabels;
+ }
+
+ protected Node getNMInNodeSet(NodeId nodeId) {
+ return getNMInNodeSet(nodeId, nodeCollections);
+ }
+
+ protected Node getNMInNodeSet(NodeId nodeId, Map<String, Host> map) {
+ return getNMInNodeSet(nodeId, map, false);
+ }
+
+ protected Node getNMInNodeSet(NodeId nodeId, Map<String, Host> map,
+ boolean checkRunning) {
+ if (WILDCARD_PORT == nodeId.getPort()) {
+ return null;
+ }
+
+ Host host = map.get(nodeId.getHost());
+ if (null == host) {
+ return null;
+ }
+ Node nm = host.nms.get(nodeId);
+ if (null == nm) {
+ return null;
+ }
+ if (checkRunning) {
+ return nm.running ? nm : null;
+ }
+ return nm;
+ }
+
+ protected Set<String> getLabelsByNode(NodeId nodeId) {
+ return getLabelsByNode(nodeId, nodeCollections);
+ }
+
+ protected Set<String> getLabelsByNode(NodeId nodeId, Map<String, Host> map) {
+ Host host = map.get(nodeId.getHost());
+ if (null == host) {
+ return EMPTY_STRING_SET;
+ }
+ Node nm = host.nms.get(nodeId);
+ if (null != nm && null != nm.labels) {
+ return nm.labels;
+ } else {
+ return host.labels;
+ }
+ }
+
+ protected void createNodeIfNonExisted(NodeId nodeId) {
+ Host host = nodeCollections.get(nodeId.getHost());
+ if (null == host) {
+ host = new Host();
+ nodeCollections.put(nodeId.getHost(), host);
+ }
+ if (nodeId.getPort() != WILDCARD_PORT) {
+ Node nm = host.nms.get(nodeId);
+ if (null == nm) {
+ host.nms.put(nodeId, new Node());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db7f1653/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java
new file mode 100644
index 0000000..2778c74
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl;
+
+import com.google.common.collect.Sets;
+
+public class FileSystemNodeLabelsStore extends NodeLabelsStore {
+
+ public FileSystemNodeLabelsStore(CommonNodeLabelsManager mgr) {
+ super(mgr);
+ }
+
+ protected static final Log LOG = LogFactory.getLog(FileSystemNodeLabelsStore.class);
+
+ protected static final String ROOT_DIR_NAME = "FSNodeLabelManagerRoot";
+ protected static final String MIRROR_FILENAME = "nodelabel.mirror";
+ protected static final String EDITLOG_FILENAME = "nodelabel.editlog";
+
+ protected enum SerializedLogType {
+ ADD_LABELS, NODE_TO_LABELS, REMOVE_LABELS
+ }
+
+ Path fsWorkingPath;
+ Path rootDirPath;
+ FileSystem fs;
+ FSDataOutputStream editlogOs;
+ Path editLogPath;
+
+ @Override
+ public void init(Configuration conf) throws Exception {
+ fsWorkingPath =
+ new Path(conf.get(YarnConfiguration.FS_NODE_LABELS_STORE_URI,
+ YarnConfiguration.DEFAULT_FS_NODE_LABELS_STORE_URI));
+ rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
+
+ setFileSystem(conf);
+
+ // mkdir of root dir path
+ fs.mkdirs(rootDirPath);
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ fs.close();
+ editlogOs.close();
+ } catch (IOException e) {
+ LOG.warn("Exception happened whiling shutting down,", e);
+ }
+ }
+
+ private void setFileSystem(Configuration conf) throws IOException {
+ Configuration confCopy = new Configuration(conf);
+ confCopy.setBoolean("dfs.client.retry.policy.enabled", true);
+ String retryPolicy =
+ confCopy.get(YarnConfiguration.FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC,
+ YarnConfiguration.DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC);
+ confCopy.set("dfs.client.retry.policy.spec", retryPolicy);
+ fs = fsWorkingPath.getFileSystem(confCopy);
+
+ // if it's local file system, use RawLocalFileSystem instead of
+ // LocalFileSystem, the latter one doesn't support append.
+ if (fs.getScheme().equals("file")) {
+ fs = ((LocalFileSystem)fs).getRaw();
+ }
+ }
+
+ private void ensureAppendEditlogFile() throws IOException {
+ editlogOs = fs.append(editLogPath);
+ }
+
+ private void ensureCloseEditlogFile() throws IOException {
+ editlogOs.close();
+ }
+
+ @Override
+ public void updateNodeToLabelsMappings(
+ Map<NodeId, Set<String>> nodeToLabels) throws IOException {
+ ensureAppendEditlogFile();
+ editlogOs.writeInt(SerializedLogType.NODE_TO_LABELS.ordinal());
+ ((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest
+ .newInstance(nodeToLabels)).getProto().writeDelimitedTo(editlogOs);
+ ensureCloseEditlogFile();
+ }
+
+ @Override
+ public void storeNewClusterNodeLabels(Set<String> labels)
+ throws IOException {
+ ensureAppendEditlogFile();
+ editlogOs.writeInt(SerializedLogType.ADD_LABELS.ordinal());
+ ((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequest.newInstance(labels)).getProto()
+ .writeDelimitedTo(editlogOs);
+ ensureCloseEditlogFile();
+ }
+
+ @Override
+ public void removeClusterNodeLabels(Collection<String> labels)
+ throws IOException {
+ ensureAppendEditlogFile();
+ editlogOs.writeInt(SerializedLogType.REMOVE_LABELS.ordinal());
+ ((RemoveFromClusterNodeLabelsRequestPBImpl) RemoveFromClusterNodeLabelsRequest.newInstance(Sets
+ .newHashSet(labels.iterator()))).getProto().writeDelimitedTo(editlogOs);
+ ensureCloseEditlogFile();
+ }
+
+ @Override
+ public void recover() throws IOException {
+ /*
+ * Steps of recover
+ * 1) Read from last mirror (from mirror or mirror.old)
+ * 2) Read from last edit log, and apply such edit log
+ * 3) Write new mirror to mirror.writing
+ * 4) Rename mirror to mirror.old
+ * 5) Move mirror.writing to mirror
+ * 6) Remove mirror.old
+ * 7) Remove edit log and create a new empty edit log
+ */
+
+ // Open mirror from serialized file
+ Path mirrorPath = new Path(rootDirPath, MIRROR_FILENAME);
+ Path oldMirrorPath = new Path(rootDirPath, MIRROR_FILENAME + ".old");
+
+ FSDataInputStream is = null;
+ if (fs.exists(mirrorPath)) {
+ is = fs.open(mirrorPath);
+ } else if (fs.exists(oldMirrorPath)) {
+ is = fs.open(oldMirrorPath);
+ }
+
+ if (null != is) {
+ Set<String> labels =
+ new AddToClusterNodeLabelsRequestPBImpl(
+ AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is)).getNodeLabels();
+ Map<NodeId, Set<String>> nodeToLabels =
+ new ReplaceLabelsOnNodeRequestPBImpl(
+ ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is))
+ .getNodeToLabels();
+ mgr.addToCluserNodeLabels(labels);
+ mgr.replaceLabelsOnNode(nodeToLabels);
+ is.close();
+ }
+
+ // Open and process editlog
+ editLogPath = new Path(rootDirPath, EDITLOG_FILENAME);
+ if (fs.exists(editLogPath)) {
+ is = fs.open(editLogPath);
+
+ while (true) {
+ try {
+ // read edit log one by one
+ SerializedLogType type = SerializedLogType.values()[is.readInt()];
+
+ switch (type) {
+ case ADD_LABELS: {
+ Collection<String> labels =
+ AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is)
+ .getNodeLabelsList();
+ mgr.addToCluserNodeLabels(Sets.newHashSet(labels.iterator()));
+ break;
+ }
+ case REMOVE_LABELS: {
+ Collection<String> labels =
+ RemoveFromClusterNodeLabelsRequestProto.parseDelimitedFrom(is)
+ .getNodeLabelsList();
+ mgr.removeFromClusterNodeLabels(labels);
+ break;
+ }
+ case NODE_TO_LABELS: {
+ Map<NodeId, Set<String>> map =
+ new ReplaceLabelsOnNodeRequestPBImpl(
+ ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is))
+ .getNodeToLabels();
+ mgr.replaceLabelsOnNode(map);
+ break;
+ }
+ }
+ } catch (EOFException e) {
+ // EOF hit, break
+ break;
+ }
+ }
+ }
+
+ // Serialize current mirror to mirror.writing
+ Path writingMirrorPath = new Path(rootDirPath, MIRROR_FILENAME + ".writing");
+ FSDataOutputStream os = fs.create(writingMirrorPath, true);
+ ((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequestPBImpl
+ .newInstance(mgr.getClusterNodeLabels())).getProto().writeDelimitedTo(os);
+ ((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest
+ .newInstance(mgr.getNodeLabels())).getProto().writeDelimitedTo(os);
+ os.close();
+
+ // Move mirror to mirror.old
+ if (fs.exists(mirrorPath)) {
+ fs.delete(oldMirrorPath, false);
+ fs.rename(mirrorPath, oldMirrorPath);
+ }
+
+ // move mirror.writing to mirror
+ fs.rename(writingMirrorPath, mirrorPath);
+ fs.delete(writingMirrorPath, false);
+
+ // remove mirror.old
+ fs.delete(oldMirrorPath, false);
+
+ // create a new editlog file
+ editlogOs = fs.create(editLogPath, true);
+ editlogOs.close();
+
+ LOG.info("Finished write mirror at:" + mirrorPath.toString());
+ LOG.info("Finished create editlog file at:" + editLogPath.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db7f1653/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelsStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelsStore.java
new file mode 100644
index 0000000..033c034
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelsStore.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public abstract class NodeLabelsStore implements Closeable {
+ protected final CommonNodeLabelsManager mgr;
+ protected Configuration conf;
+
+ public NodeLabelsStore(CommonNodeLabelsManager mgr) {
+ this.mgr = mgr;
+ }
+
+ /**
+ * Store node -> label
+ */
+ public abstract void updateNodeToLabelsMappings(
+ Map<NodeId, Set<String>> nodeToLabels) throws IOException;
+
+ /**
+ * Store new labels
+ */
+ public abstract void storeNewClusterNodeLabels(Set<String> label)
+ throws IOException;
+
+ /**
+ * Remove labels
+ */
+ public abstract void removeClusterNodeLabels(Collection<String> labels)
+ throws IOException;
+
+ /**
+ * Recover labels and node to labels mappings from store
+ * @param conf
+ */
+ public abstract void recover() throws IOException;
+
+ public void init(Configuration conf) throws Exception {
+ this.conf = conf;
+ }
+
+ public CommonNodeLabelsManager getNodeLabelsManager() {
+ return mgr;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db7f1653/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/NodeLabelsStoreEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/NodeLabelsStoreEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/NodeLabelsStoreEvent.java
new file mode 100644
index 0000000..b1b7f11
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/NodeLabelsStoreEvent.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class NodeLabelsStoreEvent extends
+ AbstractEvent<NodeLabelsStoreEventType> {
+ public NodeLabelsStoreEvent(NodeLabelsStoreEventType type) {
+ super(type);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db7f1653/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/NodeLabelsStoreEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/NodeLabelsStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/NodeLabelsStoreEventType.java
new file mode 100644
index 0000000..efa2dbe
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/NodeLabelsStoreEventType.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels.event;
+
+public enum NodeLabelsStoreEventType {
+ REMOVE_LABELS,
+ ADD_LABELS,
+ STORE_NODE_TO_LABELS
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db7f1653/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/RemoveClusterNodeLabels.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/RemoveClusterNodeLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/RemoveClusterNodeLabels.java
new file mode 100644
index 0000000..ae78394
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/RemoveClusterNodeLabels.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels.event;
+
+import java.util.Collection;
+
+public class RemoveClusterNodeLabels extends NodeLabelsStoreEvent {
+ private Collection<String> labels;
+
+ public RemoveClusterNodeLabels(Collection<String> labels) {
+ super(NodeLabelsStoreEventType.REMOVE_LABELS);
+ this.labels = labels;
+ }
+
+ public Collection<String> getLabels() {
+ return labels;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db7f1653/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/StoreNewClusterNodeLabels.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/StoreNewClusterNodeLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/StoreNewClusterNodeLabels.java
new file mode 100644
index 0000000..b478c6b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/StoreNewClusterNodeLabels.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels.event;
+
+import java.util.Set;
+
+public class StoreNewClusterNodeLabels extends NodeLabelsStoreEvent {
+ private Set<String> labels;
+
+ public StoreNewClusterNodeLabels(Set<String> labels) {
+ super(NodeLabelsStoreEventType.ADD_LABELS);
+ this.labels = labels;
+ }
+
+ public Set<String> getLabels() {
+ return labels;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db7f1653/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/UpdateNodeToLabelsMappingsEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/UpdateNodeToLabelsMappingsEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/UpdateNodeToLabelsMappingsEvent.java
new file mode 100644
index 0000000..27eeb81
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/UpdateNodeToLabelsMappingsEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels.event;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class UpdateNodeToLabelsMappingsEvent extends NodeLabelsStoreEvent {
+ private Map<NodeId, Set<String>> nodeToLabels;
+
+ public UpdateNodeToLabelsMappingsEvent(Map<NodeId, Set<String>> nodeToLabels) {
+ super(NodeLabelsStoreEventType.STORE_NODE_TO_LABELS);
+ this.nodeToLabels = nodeToLabels;
+ }
+
+ public Map<NodeId, Set<String>> getNodeToLabels() {
+ return nodeToLabels;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db7f1653/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java
new file mode 100644
index 0000000..fcdf969
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
+
+public class DummyCommonNodeLabelsManager extends CommonNodeLabelsManager {
+ Map<NodeId, Set<String>> lastNodeToLabels = null;
+ Collection<String> lastAddedlabels = null;
+ Collection<String> lastRemovedlabels = null;
+
+ @Override
+ public void initNodeLabelStore(Configuration conf) {
+ this.store = new NodeLabelsStore(this) {
+
+ @Override
+ public void recover() throws IOException {
+ }
+
+ @Override
+ public void removeClusterNodeLabels(Collection<String> labels)
+ throws IOException {
+ lastRemovedlabels = labels;
+ }
+
+ @Override
+ public void updateNodeToLabelsMappings(
+ Map<NodeId, Set<String>> nodeToLabels) throws IOException {
+ lastNodeToLabels = nodeToLabels;
+ }
+
+ @Override
+ public void storeNewClusterNodeLabels(Set<String> label) throws IOException {
+ lastAddedlabels = label;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+ };
+ }
+
+ @Override
+ protected void initDispatcher(Configuration conf) {
+ super.dispatcher = new InlineDispatcher();
+ }
+
+ @Override
+ protected void startDispatcher() {
+ // do nothing
+ }
+
+ @Override
+ protected void stopDispatcher() {
+ // do nothing
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db7f1653/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java
new file mode 100644
index 0000000..9749299
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.junit.Assert;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+public class NodeLabelTestBase {
+ public static void assertMapEquals(Map<NodeId, Set<String>> m1,
+ ImmutableMap<NodeId, Set<String>> m2) {
+ Assert.assertEquals(m1.size(), m2.size());
+ for (NodeId k : m1.keySet()) {
+ Assert.assertTrue(m2.containsKey(k));
+ assertCollectionEquals(m1.get(k), m2.get(k));
+ }
+ }
+
+ public static void assertMapContains(Map<NodeId, Set<String>> m1,
+ ImmutableMap<NodeId, Set<String>> m2) {
+ for (NodeId k : m2.keySet()) {
+ Assert.assertTrue(m1.containsKey(k));
+ assertCollectionEquals(m1.get(k), m2.get(k));
+ }
+ }
+
+ public static void assertCollectionEquals(Collection<String> c1,
+ Collection<String> c2) {
+ Assert.assertEquals(c1.size(), c2.size());
+ Iterator<String> i1 = c1.iterator();
+ Iterator<String> i2 = c2.iterator();
+ while (i1.hasNext()) {
+ Assert.assertEquals(i1.next(), i2.next());
+ }
+ }
+
+ public static <E> Set<E> toSet(E... elements) {
+ Set<E> set = Sets.newHashSet(elements);
+ return set;
+ }
+
+ public NodeId toNodeId(String str) {
+ if (str.contains(":")) {
+ int idx = str.indexOf(':');
+ NodeId id =
+ NodeId.newInstance(str.substring(0, idx),
+ Integer.valueOf(str.substring(idx + 1)));
+ return id;
+ } else {
+ return NodeId.newInstance(str, CommonNodeLabelsManager.WILDCARD_PORT);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db7f1653/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java
new file mode 100644
index 0000000..ea29f3a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class TestCommonNodeLabelsManager extends NodeLabelTestBase {
+ DummyCommonNodeLabelsManager mgr = null;
+
+ @Before
+ public void before() {
+ mgr = new DummyCommonNodeLabelsManager();
+ mgr.init(new Configuration());
+ mgr.start();
+ }
+
+ @After
+ public void after() {
+ mgr.stop();
+ }
+
+ @Test(timeout = 5000)
+ public void testAddRemovelabel() throws Exception {
+ // Add some label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("hello"));
+ assertCollectionEquals(mgr.lastAddedlabels, Arrays.asList("hello"));
+
+ mgr.addToCluserNodeLabels(ImmutableSet.of("world"));
+ mgr.addToCluserNodeLabels(toSet("hello1", "world1"));
+ assertCollectionEquals(mgr.lastAddedlabels,
+ Sets.newHashSet("hello1", "world1"));
+
+ Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
+ Sets.newHashSet("hello", "world", "hello1", "world1")));
+
+ // try to remove null, empty and non-existed label, should fail
+ for (String p : Arrays.asList(null, CommonNodeLabelsManager.NO_LABEL, "xx")) {
+ boolean caught = false;
+ try {
+ mgr.removeFromClusterNodeLabels(Arrays.asList(p));
+ } catch (IOException e) {
+ caught = true;
+ }
+ Assert.assertTrue("remove label should fail "
+ + "when label is null/empty/non-existed", caught);
+ }
+
+ // Remove some label
+ mgr.removeFromClusterNodeLabels(Arrays.asList("hello"));
+ assertCollectionEquals(mgr.lastRemovedlabels, Arrays.asList("hello"));
+ Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
+ Arrays.asList("world", "hello1", "world1")));
+
+ mgr.removeFromClusterNodeLabels(Arrays
+ .asList("hello1", "world1", "world"));
+ Assert.assertTrue(mgr.lastRemovedlabels.containsAll(Sets.newHashSet(
+ "hello1", "world1", "world")));
+ Assert.assertTrue(mgr.getClusterNodeLabels().isEmpty());
+ }
+
+ @Test(timeout = 5000)
+ public void testAddlabelWithCase() throws Exception {
+ // Add some label, case will not ignore here
+ mgr.addToCluserNodeLabels(ImmutableSet.of("HeLlO"));
+ assertCollectionEquals(mgr.lastAddedlabels, Arrays.asList("HeLlO"));
+ Assert.assertFalse(mgr.getClusterNodeLabels().containsAll(Arrays.asList("hello")));
+ }
+
+ @Test(timeout = 5000)
+ public void testAddInvalidlabel() throws IOException {
+ boolean caught = false;
+ try {
+ Set<String> set = new HashSet<String>();
+ set.add(null);
+ mgr.addToCluserNodeLabels(set);
+ } catch (IOException e) {
+ caught = true;
+ }
+ Assert.assertTrue("null label should not add to repo", caught);
+
+ caught = false;
+ try {
+ mgr.addToCluserNodeLabels(ImmutableSet.of(CommonNodeLabelsManager.NO_LABEL));
+ } catch (IOException e) {
+ caught = true;
+ }
+
+ Assert.assertTrue("empty label should not add to repo", caught);
+
+ caught = false;
+ try {
+ mgr.addToCluserNodeLabels(ImmutableSet.of("-?"));
+ } catch (IOException e) {
+ caught = true;
+ }
+ Assert.assertTrue("invalid label charactor should not add to repo", caught);
+
+ caught = false;
+ try {
+ mgr.addToCluserNodeLabels(ImmutableSet.of(StringUtils.repeat("c", 257)));
+ } catch (IOException e) {
+ caught = true;
+ }
+ Assert.assertTrue("too long label should not add to repo", caught);
+
+ caught = false;
+ try {
+ mgr.addToCluserNodeLabels(ImmutableSet.of("-aaabbb"));
+ } catch (IOException e) {
+ caught = true;
+ }
+ Assert.assertTrue("label cannot start with \"-\"", caught);
+
+ caught = false;
+ try {
+ mgr.addToCluserNodeLabels(ImmutableSet.of("_aaabbb"));
+ } catch (IOException e) {
+ caught = true;
+ }
+ Assert.assertTrue("label cannot start with \"_\"", caught);
+
+ caught = false;
+ try {
+ mgr.addToCluserNodeLabels(ImmutableSet.of("a^aabbb"));
+ } catch (IOException e) {
+ caught = true;
+ }
+ Assert.assertTrue("label cannot contains other chars like ^[] ...", caught);
+
+ caught = false;
+ try {
+ mgr.addToCluserNodeLabels(ImmutableSet.of("aa[a]bbb"));
+ } catch (IOException e) {
+ caught = true;
+ }
+ Assert.assertTrue("label cannot contains other chars like ^[] ...", caught);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test(timeout = 5000)
+ public void testAddReplaceRemoveLabelsOnNodes() throws Exception {
+ // set a label on a node, but label doesn't exist
+ boolean caught = false;
+ try {
+ mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("node"), toSet("label")));
+ } catch (IOException e) {
+ caught = true;
+ }
+ Assert.assertTrue("trying to set a label to a node but "
+ + "label doesn't exist in repository should fail", caught);
+
+ // set a label on a node, but node is null or empty
+ try {
+ mgr.replaceLabelsOnNode(ImmutableMap.of(
+ toNodeId(CommonNodeLabelsManager.NO_LABEL), toSet("label")));
+ } catch (IOException e) {
+ caught = true;
+ }
+ Assert.assertTrue("trying to add a empty node but succeeded", caught);
+
+ // set node->label one by one
+ mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
+ mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1")));
+ mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p2")));
+ mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n2"), toSet("p3")));
+ assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"),
+ toSet("p2"), toNodeId("n2"), toSet("p3")));
+ assertMapEquals(mgr.lastNodeToLabels,
+ ImmutableMap.of(toNodeId("n2"), toSet("p3")));
+
+ // set bunch of node->label
+ mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n3"), toSet("p3"),
+ toNodeId("n1"), toSet("p1")));
+ assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"),
+ toSet("p1"), toNodeId("n2"), toSet("p3"), toNodeId("n3"), toSet("p3")));
+ assertMapEquals(mgr.lastNodeToLabels, ImmutableMap.of(toNodeId("n3"),
+ toSet("p3"), toNodeId("n1"), toSet("p1")));
+
+ /*
+ * n1: p1
+ * n2: p3
+ * n3: p3
+ */
+
+ // remove label on node
+ mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("n1"), toSet("p1")));
+ assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"),
+ toSet("p3"), toNodeId("n3"), toSet("p3")));
+ assertMapEquals(mgr.lastNodeToLabels,
+ ImmutableMap.of(toNodeId("n1"), CommonNodeLabelsManager.EMPTY_STRING_SET));
+
+ // add label on node
+ mgr.addLabelsToNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"),
+ toNodeId("n2"), toSet("p2")));
+ assertMapEquals(
+ mgr.getNodeLabels(),
+ ImmutableMap.of(toNodeId("n1"), toSet("p1"), toNodeId("n2"),
+ toSet("p2", "p3"), toNodeId("n3"), toSet("p3")));
+ assertMapEquals(mgr.lastNodeToLabels,
+ ImmutableMap.of(toNodeId("n1"), toSet("p1"), toNodeId("n2"),
+ toSet("p2", "p3")));
+
+ // remove labels on node
+ mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"),
+ toNodeId("n2"), toSet("p2", "p3"), toNodeId("n3"), toSet("p3")));
+ Assert.assertEquals(0, mgr.getNodeLabels().size());
+ assertMapEquals(mgr.lastNodeToLabels, ImmutableMap.of(toNodeId("n1"),
+ CommonNodeLabelsManager.EMPTY_STRING_SET, toNodeId("n2"),
+ CommonNodeLabelsManager.EMPTY_STRING_SET, toNodeId("n3"),
+ CommonNodeLabelsManager.EMPTY_STRING_SET));
+ }
+
+ @Test(timeout = 5000)
+ public void testRemovelabelWithNodes() throws Exception {
+ mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
+ mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1")));
+ mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n2"), toSet("p2")));
+ mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n3"), toSet("p3")));
+
+ mgr.removeFromClusterNodeLabels(ImmutableSet.of("p1"));
+ assertMapEquals(mgr.getNodeLabels(),
+ ImmutableMap.of(toNodeId("n2"), toSet("p2"), toNodeId("n3"), toSet("p3")));
+ assertCollectionEquals(mgr.lastRemovedlabels, Arrays.asList("p1"));
+
+ mgr.removeFromClusterNodeLabels(ImmutableSet.of("p2", "p3"));
+ Assert.assertTrue(mgr.getNodeLabels().isEmpty());
+ Assert.assertTrue(mgr.getClusterNodeLabels().isEmpty());
+ assertCollectionEquals(mgr.lastRemovedlabels, Arrays.asList("p2", "p3"));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db7f1653/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java
new file mode 100644
index 0000000..a7546cb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+public class TestFileSystemNodeLabelsStore extends NodeLabelTestBase {
+ MockNodeLabelManager mgr = null;
+ Configuration conf = null;
+
+ private static class MockNodeLabelManager extends
+ CommonNodeLabelsManager {
+ @Override
+ protected void initDispatcher(Configuration conf) {
+ super.dispatcher = new InlineDispatcher();
+ }
+
+ @Override
+ protected void startDispatcher() {
+ // do nothing
+ }
+
+ @Override
+ protected void stopDispatcher() {
+ // do nothing
+ }
+ }
+
+ private FileSystemNodeLabelsStore getStore() {
+ return (FileSystemNodeLabelsStore) mgr.store;
+ }
+
+ @Before
+ public void before() throws IOException {
+ mgr = new MockNodeLabelManager();
+ conf = new Configuration();
+ File tempDir = File.createTempFile("nlb", ".tmp");
+ tempDir.delete();
+ tempDir.mkdirs();
+ tempDir.deleteOnExit();
+ conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_URI,
+ tempDir.getAbsolutePath());
+ mgr.init(conf);
+ mgr.start();
+ }
+
+ @After
+ public void after() throws IOException {
+ getStore().fs.delete(getStore().rootDirPath, true);
+ mgr.stop();
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test(timeout = 10000)
+ public void testRecoverWithMirror() throws Exception {
+ mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
+ mgr.addToCluserNodeLabels(toSet("p4"));
+ mgr.addToCluserNodeLabels(toSet("p5", "p6"));
+ mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n1"), toSet("p1"),
+ toNodeId("n2"), toSet("p2")));
+ mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n3"), toSet("p3"),
+ toNodeId("n4"), toSet("p4"), toNodeId("n5"), toSet("p5"),
+ toNodeId("n6"), toSet("p6"), toNodeId("n7"), toSet("p6")));
+
+ /*
+ * node -> partition p1: n1 p2: n2 p3: n3 p4: n4 p5: n5 p6: n6, n7
+ */
+
+ mgr.removeFromClusterNodeLabels(toSet("p1"));
+ mgr.removeFromClusterNodeLabels(Arrays.asList("p3", "p5"));
+
+ /*
+ * After removed p2: n2 p4: n4 p6: n6, n7
+ */
+ // shutdown mgr and start a new mgr
+ mgr.stop();
+
+ mgr = new MockNodeLabelManager();
+ mgr.init(conf);
+
+ // check variables
+ Assert.assertEquals(3, mgr.getClusterNodeLabels().size());
+ Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
+ Arrays.asList("p2", "p4", "p6")));
+
+ assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"),
+ toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"),
+ toNodeId("n7"), toSet("p6")));
+
+ // stutdown mgr and start a new mgr
+ mgr.stop();
+ mgr = new MockNodeLabelManager();
+ mgr.init(conf);
+
+ // check variables
+ Assert.assertEquals(3, mgr.getClusterNodeLabels().size());
+ Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
+ Arrays.asList("p2", "p4", "p6")));
+
+ assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"),
+ toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"),
+ toNodeId("n7"), toSet("p6")));
+ mgr.stop();
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test(timeout = 10000)
+ public void testEditlogRecover() throws Exception {
+ mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
+ mgr.addToCluserNodeLabels(toSet("p4"));
+ mgr.addToCluserNodeLabels(toSet("p5", "p6"));
+ mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"),
+ toNodeId("n2"), toSet("p2")));
+ mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n3"), toSet("p3"),
+ toNodeId("n4"), toSet("p4"), toNodeId("n5"), toSet("p5"),
+ toNodeId("n6"), toSet("p6"), toNodeId("n7"), toSet("p6")));
+
+ /*
+ * node -> partition p1: n1 p2: n2 p3: n3 p4: n4 p5: n5 p6: n6, n7
+ */
+
+ mgr.removeFromClusterNodeLabels(toSet("p1"));
+ mgr.removeFromClusterNodeLabels(Arrays.asList("p3", "p5"));
+
+ /*
+ * After removed p2: n2 p4: n4 p6: n6, n7
+ */
+ // shutdown mgr and start a new mgr
+ mgr.stop();
+
+ mgr = new MockNodeLabelManager();
+ mgr.init(conf);
+
+ // check variables
+ Assert.assertEquals(3, mgr.getClusterNodeLabels().size());
+ Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
+ Arrays.asList("p2", "p4", "p6")));
+
+ assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"),
+ toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"),
+ toNodeId("n7"), toSet("p6")));
+ mgr.stop();
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test//(timeout = 10000)
+ public void testSerilizationAfterRecovery() throws Exception {
+ mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
+ mgr.addToCluserNodeLabels(toSet("p4"));
+ mgr.addToCluserNodeLabels(toSet("p5", "p6"));
+ mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"),
+ toNodeId("n2"), toSet("p2")));
+ mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n3"), toSet("p3"),
+ toNodeId("n4"), toSet("p4"), toNodeId("n5"), toSet("p5"),
+ toNodeId("n6"), toSet("p6"), toNodeId("n7"), toSet("p6")));
+
+ /*
+ * node -> labels
+ * p1: n1
+ * p2: n2
+ * p3: n3
+ * p4: n4
+ * p5: n5
+ * p6: n6, n7
+ */
+
+ mgr.removeFromClusterNodeLabels(toSet("p1"));
+ mgr.removeFromClusterNodeLabels(Arrays.asList("p3", "p5"));
+
+ /*
+ * After removed
+ * p2: n2
+ * p4: n4
+ * p6: n6, n7
+ */
+ // shutdown mgr and start a new mgr
+ mgr.stop();
+
+ mgr = new MockNodeLabelManager();
+ mgr.init(conf);
+ mgr.start();
+
+ // check variables
+ Assert.assertEquals(3, mgr.getClusterNodeLabels().size());
+ Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
+ Arrays.asList("p2", "p4", "p6")));
+
+ assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"),
+ toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"),
+ toNodeId("n7"), toSet("p6")));
+
+ /*
+ * Add label p7,p8 then shutdown
+ */
+ mgr = new MockNodeLabelManager();
+ mgr.init(conf);
+ mgr.start();
+ mgr.addToCluserNodeLabels(toSet("p7", "p8"));
+ mgr.stop();
+
+ /*
+ * Restart, add label p9 and shutdown
+ */
+ mgr = new MockNodeLabelManager();
+ mgr.init(conf);
+ mgr.start();
+ mgr.addToCluserNodeLabels(toSet("p9"));
+ mgr.stop();
+
+ /*
+ * Recovery, and see if p9 added
+ */
+ mgr = new MockNodeLabelManager();
+ mgr.init(conf);
+ mgr.start();
+
+ // check variables
+ Assert.assertEquals(6, mgr.getClusterNodeLabels().size());
+ Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
+ Arrays.asList("p2", "p4", "p6", "p7", "p8", "p9")));
+ mgr.stop();
+ }
+}