You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/08/25 15:49:54 UTC

[37/50] [abbrv] hadoop git commit: YARN-7875. Node Attribute store for storing and recovering attributes. Contributed by Bibin A Chundatt.

YARN-7875. Node Attribute store for storing and recovering attributes. Contributed by Bibin A Chundatt.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1c022888
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1c022888
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1c022888

Branch: refs/heads/YARN-3409
Commit: 1c022888c3a19d98925d1a2ef4d263ee3fc67ead
Parents: c535be9
Author: Sunil G <su...@apache.org>
Authored: Fri Apr 6 07:09:27 2018 +0530
Committer: Sunil G <su...@apache.org>
Committed: Sat Aug 25 21:10:57 2018 +0530

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  16 ++
 .../yarn/nodelabels/NodeAttributeStore.java     |  77 ++++++
 .../yarn/nodelabels/NodeAttributesManager.java  |  11 +
 .../hadoop/yarn/nodelabels/RMNodeAttribute.java |   6 -
 .../nodelabels/store/AbstractFSNodeStore.java   |   2 +-
 .../yarn/nodelabels/store/FSStoreOpHandler.java |  21 +-
 .../store/op/AddNodeToAttributeLogOp.java       |  71 +++++
 .../nodelabels/store/op/FSNodeStoreLogOp.java   |  17 ++
 .../store/op/NodeAttributeMirrorOp.java         |  64 +++++
 .../store/op/RemoveNodeToAttributeLogOp.java    |  71 +++++
 .../store/op/ReplaceNodeToAttributeLogOp.java   |  73 ++++++
 .../yarn/nodelabels/store/op/package-info.java  |  21 ++
 .../src/main/resources/yarn-default.xml         |  16 ++
 .../FileSystemNodeAttributeStore.java           | 102 ++++++++
 .../nodelabels/NodeAttributesManagerImpl.java   | 100 ++++++-
 .../TestResourceTrackerService.java             |  10 +
 .../TestFileSystemNodeAttributeStore.java       | 260 +++++++++++++++++++
 .../nodelabels/TestNodeAttributesManager.java   |  13 +-
 18 files changed, 935 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index b331381..eeadd93 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3464,6 +3464,22 @@ public class YarnConfiguration extends Configuration {
       + "fs-store.root-dir";
 
   /**
+   * Node-attribute configurations.
+   */
+  public static final String NODE_ATTRIBUTE_PREFIX =
+      YARN_PREFIX + "node-attribute.";
+  /**
+   * Node attribute store implementation class.
+   */
+  public static final String FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS =
+      NODE_ATTRIBUTE_PREFIX + "fs-store.impl.class";
+  /**
+   * File system not attribute store directory.
+   */
+  public static final String FS_NODE_ATTRIBUTE_STORE_ROOT_DIR =
+      NODE_ATTRIBUTE_PREFIX + "fs-store.root-dir";
+
+  /**
    * Flag to indicate if the node labels feature enabled, by default it's
    * disabled
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributeStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributeStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributeStore.java
new file mode 100644
index 0000000..8e9f9ff
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributeStore.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Interface class for Node label store.
+ */
+public interface NodeAttributeStore extends Closeable {
+
+  /**
+   * Replace labels on node.
+   *
+   * @param nodeToAttribute node to attribute list.
+   * @throws IOException
+   */
+  void replaceNodeAttributes(List<NodeToAttributes> nodeToAttribute)
+      throws IOException;
+
+  /**
+   * Add attribute to node.
+   *
+   * @param nodeToAttribute node to attribute list.
+   * @throws IOException
+   */
+  void addNodeAttributes(List<NodeToAttributes> nodeToAttribute)
+      throws IOException;
+
+  /**
+   * Remove attribute from node.
+   *
+   * @param nodeToAttribute node to attribute list.
+   * @throws IOException
+   */
+  void removeNodeAttributes(List<NodeToAttributes> nodeToAttribute)
+      throws IOException;
+
+  /**
+   * Initialize based on configuration and NodeAttributesManager.
+   *
+   * @param configuration configuration instance.
+   * @param mgr nodeattributemanager instance.
+   * @throws Exception
+   */
+  void init(Configuration configuration, NodeAttributesManager mgr)
+      throws Exception;
+
+  /**
+   * Recover store on resourcemanager startup.
+   * @throws IOException
+   * @throws YarnException
+   */
+  void recover() throws IOException, YarnException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
index ffa33cf..ec7d30d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
@@ -19,11 +19,13 @@
 package org.apache.hadoop.yarn.nodelabels;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
 
 /**
  * This class captures all interactions for Attributes with RM.
@@ -101,6 +103,15 @@ public abstract class NodeAttributesManager extends AbstractService {
   public abstract Map<NodeAttribute, AttributeValue> getAttributesForNode(
       String hostName);
 
+  /**
+   * Get All node to Attributes list based on filter.
+   *
+   * @return List<NodeToAttributes> nodeToAttributes matching filter.If empty
+   * or null is passed as argument will return all.
+   */
+  public abstract List<NodeToAttributes> getNodeToAttributes(
+      Set<String> prefix);
+
   // futuristic
   // public set<NodeId> getNodesMatchingExpression(String nodeLabelExp);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/RMNodeAttribute.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/RMNodeAttribute.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/RMNodeAttribute.java
index 5a709c6..3b2bd16 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/RMNodeAttribute.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/RMNodeAttribute.java
@@ -53,12 +53,6 @@ public class RMNodeAttribute extends AbstractLabel {
     this.attribute = attribute;
   }
 
-  public RMNodeAttribute(String attributeName) {
-    super(attributeName);
-    attribute = NodeAttribute.newInstance(attributeName,
-        NodeAttributeType.STRING, CommonNodeLabelsManager.NO_LABEL);
-  }
-
   public NodeAttributeType getAttributeType() {
     return attribute.getAttributeType();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/AbstractFSNodeStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/AbstractFSNodeStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/AbstractFSNodeStore.java
index a47cacf..216fc79 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/AbstractFSNodeStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/AbstractFSNodeStore.java
@@ -64,7 +64,7 @@ public abstract class AbstractFSNodeStore<M> {
     initFileSystem(conf);
     // mkdir of root dir path
     fs.mkdirs(fsWorkingPath);
-
+    LOG.info("Created store directory :" + fsWorkingPath);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/FSStoreOpHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/FSStoreOpHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/FSStoreOpHandler.java
index 0f7f53d..a626537 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/FSStoreOpHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/FSStoreOpHandler.java
@@ -17,13 +17,17 @@
  */
 package org.apache.hadoop.yarn.nodelabels.store;
 
-import static org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler
-    .StoreType.NODE_LABEL_STORE;
+import static org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler.StoreType.NODE_ATTRIBUTE;
+import static org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler.StoreType.NODE_LABEL_STORE;
 import org.apache.hadoop.yarn.nodelabels.store.op.AddClusterLabelOp;
+import org.apache.hadoop.yarn.nodelabels.store.op.AddNodeToAttributeLogOp;
 import org.apache.hadoop.yarn.nodelabels.store.op.FSNodeStoreLogOp;
+import org.apache.hadoop.yarn.nodelabels.store.op.NodeAttributeMirrorOp;
 import org.apache.hadoop.yarn.nodelabels.store.op.NodeLabelMirrorOp;
 import org.apache.hadoop.yarn.nodelabels.store.op.NodeToLabelOp;
 import org.apache.hadoop.yarn.nodelabels.store.op.RemoveClusterLabelOp;
+import org.apache.hadoop.yarn.nodelabels.store.op.RemoveNodeToAttributeLogOp;
+import org.apache.hadoop.yarn.nodelabels.store.op.ReplaceNodeToAttributeLogOp;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -39,7 +43,7 @@ public class FSStoreOpHandler {
 
   public enum StoreType {
     NODE_LABEL_STORE,
-    NODE_LABEL_ATTRIBUTE;
+    NODE_ATTRIBUTE
   }
 
   static {
@@ -47,13 +51,24 @@ public class FSStoreOpHandler {
     mirrorOp = new HashMap<>();
 
     // registerLog edit log operation
+
+    //Node Label Operations
     registerLog(NODE_LABEL_STORE, AddClusterLabelOp.OPCODE, AddClusterLabelOp.class);
     registerLog(NODE_LABEL_STORE, NodeToLabelOp.OPCODE, NodeToLabelOp.class);
     registerLog(NODE_LABEL_STORE, RemoveClusterLabelOp.OPCODE, RemoveClusterLabelOp.class);
 
+    //NodeAttibute operation
+    registerLog(NODE_ATTRIBUTE, AddNodeToAttributeLogOp.OPCODE, AddNodeToAttributeLogOp.class);
+    registerLog(NODE_ATTRIBUTE, RemoveNodeToAttributeLogOp.OPCODE, RemoveNodeToAttributeLogOp.class);
+    registerLog(NODE_ATTRIBUTE, ReplaceNodeToAttributeLogOp.OPCODE, ReplaceNodeToAttributeLogOp.class);
+
     // registerLog Mirror op
 
+    // Node label mirror operation
     registerMirror(NODE_LABEL_STORE, NodeLabelMirrorOp.class);
+    //Node attribute mirror operation
+    registerMirror(NODE_ATTRIBUTE, NodeAttributeMirrorOp.class);
+
   }
 
   private static void registerMirror(StoreType type,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/AddNodeToAttributeLogOp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/AddNodeToAttributeLogOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/AddNodeToAttributeLogOp.java
new file mode 100644
index 0000000..4b92bcf
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/AddNodeToAttributeLogOp.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.nodelabels.store.op;
+
+import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodesToAttributesMappingRequestPBImpl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+/**
+ * File system Add Node to attribute mapping.
+ */
+public class AddNodeToAttributeLogOp
+    extends FSNodeStoreLogOp<NodeAttributesManager> {
+
+  private List<NodeToAttributes> attributes;
+
+  public static final int OPCODE = 0;
+
+  @Override
+  public void write(OutputStream os, NodeAttributesManager mgr)
+      throws IOException {
+    ((NodesToAttributesMappingRequestPBImpl) NodesToAttributesMappingRequest
+        .newInstance(AttributeMappingOperationType.ADD, attributes, false))
+        .getProto().writeDelimitedTo(os);
+  }
+
+  @Override
+  public void recover(InputStream is, NodeAttributesManager mgr)
+      throws IOException {
+    NodesToAttributesMappingRequest request =
+        new NodesToAttributesMappingRequestPBImpl(
+            YarnServerResourceManagerServiceProtos
+                .NodesToAttributesMappingRequestProto
+                .parseDelimitedFrom(is));
+    mgr.addNodeAttributes(getNodeToAttributesMap(request));
+  }
+
+  public AddNodeToAttributeLogOp setAttributes(
+      List<NodeToAttributes> attributesList) {
+    this.attributes = attributesList;
+    return this;
+  }
+
+  @Override
+  public int getOpCode() {
+    return OPCODE;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/FSNodeStoreLogOp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/FSNodeStoreLogOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/FSNodeStoreLogOp.java
index cd739c0..bf4d1b9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/FSNodeStoreLogOp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/FSNodeStoreLogOp.java
@@ -17,10 +17,18 @@
  */
 package org.apache.hadoop.yarn.nodelabels.store.op;
 
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
 import org.apache.hadoop.yarn.nodelabels.store.StoreOp;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
 
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * Defines all FileSystem editlog operation. All node label and attribute
@@ -32,4 +40,13 @@ public abstract class FSNodeStoreLogOp<M>
     implements StoreOp<OutputStream, InputStream, M> {
 
   public abstract int getOpCode();
+
+  protected Map<String, Set<NodeAttribute>> getNodeToAttributesMap(
+      NodesToAttributesMappingRequest request) {
+    List<NodeToAttributes> attributes = request.getNodesToAttributes();
+    Map<String, Set<NodeAttribute>> nodeToAttrMap = new HashMap<>();
+    attributes.forEach((v) -> nodeToAttrMap
+        .put(v.getNode(), new HashSet<>(v.getNodeAttributes())));
+    return nodeToAttrMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/NodeAttributeMirrorOp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/NodeAttributeMirrorOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/NodeAttributeMirrorOp.java
new file mode 100644
index 0000000..dca0555
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/NodeAttributeMirrorOp.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.nodelabels.store.op;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodesToAttributesMappingRequestPBImpl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * File System Node Attribute Mirror read and write operation.
+ */
+public class NodeAttributeMirrorOp
+    extends FSNodeStoreLogOp<NodeAttributesManager> {
+
+  @Override
+  public void write(OutputStream os, NodeAttributesManager mgr)
+      throws IOException {
+    ((NodesToAttributesMappingRequestPBImpl) NodesToAttributesMappingRequest
+        .newInstance(AttributeMappingOperationType.REPLACE,
+            mgr.getNodeToAttributes(
+                ImmutableSet.of(NodeAttribute.PREFIX_CENTRALIZED)), false))
+        .getProto().writeDelimitedTo(os);
+  }
+
+  @Override
+  public void recover(InputStream is, NodeAttributesManager mgr)
+      throws IOException {
+    NodesToAttributesMappingRequest request =
+        new NodesToAttributesMappingRequestPBImpl(
+            YarnServerResourceManagerServiceProtos
+                .NodesToAttributesMappingRequestProto
+                .parseDelimitedFrom(is));
+    mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
+        getNodeToAttributesMap(request));
+  }
+
+  @Override
+  public int getOpCode() {
+    return -1;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/RemoveNodeToAttributeLogOp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/RemoveNodeToAttributeLogOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/RemoveNodeToAttributeLogOp.java
new file mode 100644
index 0000000..1d13077
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/RemoveNodeToAttributeLogOp.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.nodelabels.store.op;
+
+import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodesToAttributesMappingRequestPBImpl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+/**
+ * File system remove node attribute from node operation.
+ */
+public class RemoveNodeToAttributeLogOp
+    extends FSNodeStoreLogOp<NodeAttributesManager> {
+
+  private List<NodeToAttributes> attributes;
+
+  public static final int OPCODE = 1;
+
+  @Override
+  public void write(OutputStream os, NodeAttributesManager mgr)
+      throws IOException {
+    ((NodesToAttributesMappingRequestPBImpl) NodesToAttributesMappingRequest
+        .newInstance(AttributeMappingOperationType.REMOVE, attributes, false))
+        .getProto().writeDelimitedTo(os);
+  }
+
+  @Override
+  public void recover(InputStream is, NodeAttributesManager mgr)
+      throws IOException {
+    NodesToAttributesMappingRequest request =
+        new NodesToAttributesMappingRequestPBImpl(
+            YarnServerResourceManagerServiceProtos
+                .NodesToAttributesMappingRequestProto
+                .parseDelimitedFrom(is));
+    mgr.removeNodeAttributes(getNodeToAttributesMap(request));
+  }
+
+  public RemoveNodeToAttributeLogOp setAttributes(
+      List<NodeToAttributes> attrs) {
+    this.attributes = attrs;
+    return this;
+  }
+
+  @Override
+  public int getOpCode() {
+    return OPCODE;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/ReplaceNodeToAttributeLogOp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/ReplaceNodeToAttributeLogOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/ReplaceNodeToAttributeLogOp.java
new file mode 100644
index 0000000..54d7651
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/ReplaceNodeToAttributeLogOp.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.nodelabels.store.op;
+
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodesToAttributesMappingRequestPBImpl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+/**
+ * File system replace node attribute from node operation.
+ */
+public class ReplaceNodeToAttributeLogOp
+    extends FSNodeStoreLogOp<NodeAttributesManager> {
+
+  private List<NodeToAttributes> attributes;
+  public static final int OPCODE = 2;
+
+  @Override
+  public void write(OutputStream os, NodeAttributesManager mgr)
+      throws IOException {
+    ((NodesToAttributesMappingRequestPBImpl) NodesToAttributesMappingRequest
+        .newInstance(AttributeMappingOperationType.REPLACE, attributes, false))
+        .getProto().writeDelimitedTo(os);
+  }
+
+  @Override
+  public void recover(InputStream is, NodeAttributesManager mgr)
+      throws IOException {
+    NodesToAttributesMappingRequest request =
+        new NodesToAttributesMappingRequestPBImpl(
+            YarnServerResourceManagerServiceProtos
+                .NodesToAttributesMappingRequestProto
+                .parseDelimitedFrom(is));
+    //Only CENTRALIZED is stored to FS system
+    mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
+        getNodeToAttributesMap(request));
+  }
+
+  public ReplaceNodeToAttributeLogOp setAttributes(
+      List<NodeToAttributes> attrs) {
+    this.attributes = attrs;
+    return this;
+  }
+
+  @Override
+  public int getOpCode() {
+    return OPCODE;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/package-info.java
new file mode 100644
index 0000000..f6fb3d3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.yarn.nodelabels.store.op;
+import org.apache.hadoop.classification.InterfaceAudience;
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index e6d708f..b74fccd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3950,4 +3950,20 @@
     <name>yarn.nodemanager.elastic-memory-control.timeout-sec</name>
     <value>5</value>
   </property>
+  <property>
+    <description>
+      URI for NodeAttributeManager. The default value is
+      /tmp/hadoop-yarn-${user}/node-attribute/ in the local filesystem.
+    </description>
+    <name>yarn.node-attribute.fs-store.root-dir</name>
+    <value></value>
+  </property>
+
+  <property>
+    <description>
+      Choose different implementation of node attribute's storage
+    </description>
+    <name>yarn.node-attribute.fs-store.impl.class</name>
+    <value>org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore</value>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/FileSystemNodeAttributeStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/FileSystemNodeAttributeStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/FileSystemNodeAttributeStore.java
new file mode 100644
index 0000000..01df250
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/FileSystemNodeAttributeStore.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
+import org.apache.hadoop.yarn.nodelabels.store.AbstractFSNodeStore;
+import org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler;
+import org.apache.hadoop.yarn.nodelabels.store.op.AddNodeToAttributeLogOp;
+import org.apache.hadoop.yarn.nodelabels.store.op.RemoveNodeToAttributeLogOp;
+import org.apache.hadoop.yarn.nodelabels.store.op.ReplaceNodeToAttributeLogOp;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * File system node attribute implementation.
+ */
+public class FileSystemNodeAttributeStore
+    extends AbstractFSNodeStore<NodeAttributesManager>
+    implements NodeAttributeStore {
+
+  protected static final Log LOG =
+      LogFactory.getLog(FileSystemNodeAttributeStore.class);
+
+  protected static final String DEFAULT_DIR_NAME = "node-attribute";
+  protected static final String MIRROR_FILENAME = "nodeattribute.mirror";
+  protected static final String EDITLOG_FILENAME = "nodeattribute.editlog";
+
+  public FileSystemNodeAttributeStore() {
+    super(FSStoreOpHandler.StoreType.NODE_ATTRIBUTE);
+  }
+
+  private String getDefaultFSNodeAttributeRootDir() throws IOException {
+    // default is in local: /tmp/hadoop-yarn-${user}/node-attribute/
+    return "file:///tmp/hadoop-yarn-" + UserGroupInformation.getCurrentUser()
+        .getShortUserName() + "/" + DEFAULT_DIR_NAME;
+  }
+
+  @Override
+  public void init(Configuration conf, NodeAttributesManager mgr)
+      throws Exception {
+    StoreSchema schema = new StoreSchema(EDITLOG_FILENAME, MIRROR_FILENAME);
+    initStore(conf, new Path(
+        conf.get(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
+            getDefaultFSNodeAttributeRootDir())), schema, mgr);
+  }
+
+  @Override
+  public void replaceNodeAttributes(List<NodeToAttributes> nodeToAttribute)
+      throws IOException {
+    ReplaceNodeToAttributeLogOp op = new ReplaceNodeToAttributeLogOp();
+    writeToLog(op.setAttributes(nodeToAttribute));
+  }
+
+  @Override
+  public void addNodeAttributes(List<NodeToAttributes> nodeAttributeMapping)
+      throws IOException {
+    AddNodeToAttributeLogOp op = new AddNodeToAttributeLogOp();
+    writeToLog(op.setAttributes(nodeAttributeMapping));
+  }
+
+  @Override
+  public void removeNodeAttributes(List<NodeToAttributes> nodeAttributeMapping)
+      throws IOException {
+    RemoveNodeToAttributeLogOp op = new RemoveNodeToAttributeLogOp();
+    writeToLog(op.setAttributes(nodeAttributeMapping));
+  }
+
+  @Override
+  public void recover() throws IOException, YarnException {
+    super.recoverFromStore();
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.closeFSStore();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
index 04d74a8..b4686e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
@@ -32,24 +32,31 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.ArrayList;
+import java.util.List;
 
 import com.google.common.base.Strings;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.api.records.NodeAttribute;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.nodelabels.AttributeValue;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
 import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
 import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
 import org.apache.hadoop.yarn.nodelabels.RMNodeAttribute;
 import org.apache.hadoop.yarn.nodelabels.StringAttributeValue;
 import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
 
 /**
  * Manager holding the attributes to Labels.
@@ -63,7 +70,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
    */
   public static final String EMPTY_ATTRIBUTE_VALUE = "";
 
-  private Dispatcher dispatcher;
+  Dispatcher dispatcher;
+  NodeAttributeStore store;
 
   // TODO may be we can have a better collection here.
   // this will be updated to get the attributeName to NM mapping
@@ -121,7 +129,21 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
   }
 
   protected void initNodeAttributeStore(Configuration conf) throws Exception {
-    // TODO to generalize and make use of the FileSystemNodeLabelsStore
+    this.store =getAttributeStoreClass(conf);
+    this.store.init(conf, this);
+    this.store.recover();
+  }
+
+  private NodeAttributeStore getAttributeStoreClass(Configuration conf) {
+    try {
+      return ReflectionUtils.newInstance(
+          conf.getClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
+              FileSystemNodeAttributeStore.class, NodeAttributeStore.class),
+          conf);
+    } catch (Exception e) {
+      throw new YarnRuntimeException(
+          "Could not instantiate Node Attribute Store ", e);
+    }
   }
 
   private void internalUpdateAttributesOnNodes(
@@ -174,7 +196,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
 
       LOG.info(logMsg);
 
-      if (null != dispatcher) {
+      if (null != dispatcher && NodeAttribute.PREFIX_CENTRALIZED
+          .equals(attributePrefix)) {
         dispatcher.getEventHandler()
             .handle(new NodeAttributesStoreEvent(nodeAttributeMapping, op));
       }
@@ -382,6 +405,32 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
     }
   }
 
+  @Override
+  public List<NodeToAttributes> getNodeToAttributes(Set<String> prefix) {
+    try {
+      readLock.lock();
+      List<NodeToAttributes> nodeToAttributes = new ArrayList<>();
+      nodeCollections.forEach((k, v) -> {
+        List<NodeAttribute> attrs;
+        if (prefix == null || prefix.isEmpty()) {
+          attrs = new ArrayList<>(v.getAttributes().keySet());
+        } else {
+          attrs = new ArrayList<>();
+          for (Entry<NodeAttribute, AttributeValue> nodeAttr : v.attributes
+              .entrySet()) {
+            if (prefix.contains(nodeAttr.getKey().getAttributePrefix())) {
+              attrs.add(nodeAttr.getKey());
+            }
+          }
+        }
+        nodeToAttributes.add(NodeToAttributes.newInstance(k, attrs));
+      });
+      return nodeToAttributes;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   public void activateNode(NodeId nodeId, Resource resource) {
     try {
       writeLock.lock();
@@ -524,7 +573,29 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
 
   // Dispatcher related code
   protected void handleStoreEvent(NodeAttributesStoreEvent event) {
-    // TODO Need to extend the File
+    List<NodeToAttributes> mappingList = new ArrayList<>();
+    Map<String, Map<NodeAttribute, AttributeValue>> nodeToAttr =
+        event.getNodeAttributeMappingList();
+    nodeToAttr.forEach((k, v) -> mappingList
+        .add(NodeToAttributes.newInstance(k, new ArrayList<>(v.keySet()))));
+    try {
+      switch (event.getOperation()) {
+      case REPLACE:
+        store.replaceNodeAttributes(mappingList);
+        break;
+      case ADD:
+        store.addNodeAttributes(mappingList);
+        break;
+      case REMOVE:
+        store.removeNodeAttributes(mappingList);
+        break;
+      default:
+        LOG.warn("Unsupported operation");
+      }
+    } catch (IOException e) {
+      LOG.error("Failed to store attribute modification to storage");
+      throw new YarnRuntimeException(e);
+    }
   }
 
   @Override
@@ -549,7 +620,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
   private void processMapping(
       Map<String, Set<NodeAttribute>> nodeAttributeMapping,
       AttributeMappingOperationType mappingType) throws IOException {
-    processMapping(nodeAttributeMapping, mappingType, null);
+    processMapping(nodeAttributeMapping, mappingType,
+        NodeAttribute.PREFIX_CENTRALIZED);
   }
 
   private void processMapping(
@@ -564,4 +636,22 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
     internalUpdateAttributesOnNodes(validMapping, mappingType,
         newAttributesToBeAdded, attributePrefix);
   }
+
+  protected void stopDispatcher() {
+    AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
+    if (null != asyncDispatcher) {
+      asyncDispatcher.stop();
+    }
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    // finalize store
+    stopDispatcher();
+
+    // only close store when we enabled store persistent
+    if (null != store) {
+      store.close();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index a29e8a2..adb7fe0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -828,6 +830,14 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     Configuration conf = new Configuration();
     conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
         hostFile.getAbsolutePath());
+    conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
+        FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
+    File tempDir = File.createTempFile("nattr", ".tmp");
+    tempDir.delete();
+    tempDir.mkdirs();
+    tempDir.deleteOnExit();
+    conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
+        tempDir.getAbsolutePath());
     rm = new MockRM(conf);
     rm.start();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestFileSystemNodeAttributeStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestFileSystemNodeAttributeStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestFileSystemNodeAttributeStore.java
new file mode 100644
index 0000000..e2ee8b4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestFileSystemNodeAttributeStore.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
+import org.apache.hadoop.yarn.nodelabels.AttributeValue;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class TestFileSystemNodeAttributeStore {
+
+  private MockNodeAttrbuteManager mgr = null;
+  private Configuration conf = null;
+
+  private static class MockNodeAttrbuteManager
+      extends NodeAttributesManagerImpl {
+    @Override
+    protected void initDispatcher(Configuration conf) {
+      super.dispatcher = new InlineDispatcher();
+    }
+
+    @Override
+    protected void startDispatcher() {
+      //Do nothing
+    }
+
+    @Override
+    protected void stopDispatcher() {
+      //Do nothing
+    }
+  }
+
+  @Before
+  public void before() throws IOException {
+    mgr = new MockNodeAttrbuteManager();
+    conf = new Configuration();
+    conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
+        FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
+    File tempDir = File.createTempFile("nattr", ".tmp");
+    tempDir.delete();
+    tempDir.mkdirs();
+    tempDir.deleteOnExit();
+    conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
+        tempDir.getAbsolutePath());
+    mgr.init(conf);
+    mgr.start();
+  }
+
+  @After
+  public void after() throws IOException {
+    FileSystemNodeAttributeStore fsStore =
+        ((FileSystemNodeAttributeStore) mgr.store);
+    fsStore.getFs().delete(fsStore.getFsWorkingPath(), true);
+    mgr.stop();
+  }
+
+  @Test(timeout = 10000)
+  public void testRecoverWithMirror() throws Exception {
+
+    //------host0----
+    // add       -GPU & FPGA
+    // remove    -GPU
+    // replace   -Docker
+    //------host1----
+    // add--GPU
+    NodeAttribute docker = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "DOCKER",
+            NodeAttributeType.STRING, "docker-0");
+    NodeAttribute gpu = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
+            NodeAttributeType.STRING, "nvidia");
+    NodeAttribute fpga = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "FPGA",
+            NodeAttributeType.STRING, "asus");
+
+    Map<String, Set<NodeAttribute>> toAddAttributes = new HashMap<>();
+    toAddAttributes.put("host0", ImmutableSet.of(gpu, fpga));
+    toAddAttributes.put("host1", ImmutableSet.of(gpu));
+    // Add node attribute
+    mgr.addNodeAttributes(toAddAttributes);
+
+    Assert.assertEquals("host0 size", 2,
+        mgr.getAttributesForNode("host0").size());
+    // Add test to remove
+    toAddAttributes.clear();
+    toAddAttributes.put("host0", ImmutableSet.of(gpu));
+    mgr.removeNodeAttributes(toAddAttributes);
+
+    // replace nodeattribute
+    toAddAttributes.clear();
+    toAddAttributes.put("host0", ImmutableSet.of(docker));
+    mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
+        toAddAttributes);
+    Map<NodeAttribute, AttributeValue> attrs =
+        mgr.getAttributesForNode("host0");
+    Assert.assertEquals(attrs.size(), 1);
+    Assert.assertEquals(attrs.keySet().toArray()[0], docker);
+    mgr.stop();
+
+    // Start new attribute manager with same path
+    mgr = new MockNodeAttrbuteManager();
+    mgr.init(conf);
+    mgr.start();
+
+    mgr.getAttributesForNode("host0");
+    Assert.assertEquals("host0 size", 1,
+        mgr.getAttributesForNode("host0").size());
+    Assert.assertEquals("host1 size", 1,
+        mgr.getAttributesForNode("host1").size());
+    attrs = mgr.getAttributesForNode("host0");
+    Assert.assertEquals(attrs.size(), 1);
+    Assert.assertEquals(attrs.keySet().toArray()[0], docker);
+    //------host0----
+    // current       - docker
+    // replace       - gpu
+    //----- host1----
+    // current       - gpu
+    // add           - docker
+    toAddAttributes.clear();
+    toAddAttributes.put("host0", ImmutableSet.of(gpu));
+    mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
+        toAddAttributes);
+
+    toAddAttributes.clear();
+    toAddAttributes.put("host1", ImmutableSet.of(docker));
+    mgr.addNodeAttributes(toAddAttributes);
+    // Recover from mirror and edit log
+    mgr.stop();
+
+    mgr = new MockNodeAttrbuteManager();
+    mgr.init(conf);
+    mgr.start();
+    Assert.assertEquals("host0 size", 1,
+        mgr.getAttributesForNode("host0").size());
+    Assert.assertEquals("host1 size", 2,
+        mgr.getAttributesForNode("host1").size());
+    attrs = mgr.getAttributesForNode("host0");
+    Assert.assertEquals(attrs.size(), 1);
+    Assert.assertEquals(attrs.keySet().toArray()[0], gpu);
+    attrs = mgr.getAttributesForNode("host1");
+    Assert.assertTrue(attrs.keySet().contains(docker));
+    Assert.assertTrue(attrs.keySet().contains(gpu));
+  }
+
+  @Test(timeout = 10000)
+  public void testRecoverFromEditLog() throws Exception {
+    NodeAttribute docker = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "DOCKER",
+            NodeAttributeType.STRING, "docker-0");
+    NodeAttribute gpu = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
+            NodeAttributeType.STRING, "nvidia");
+    NodeAttribute fpga = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "FPGA",
+            NodeAttributeType.STRING, "asus");
+
+    Map<String, Set<NodeAttribute>> toAddAttributes = new HashMap<>();
+    toAddAttributes.put("host0", ImmutableSet.of(gpu, fpga));
+    toAddAttributes.put("host1", ImmutableSet.of(docker));
+
+    // Add node attribute
+    mgr.addNodeAttributes(toAddAttributes);
+
+    Assert.assertEquals("host0 size", 2,
+        mgr.getAttributesForNode("host0").size());
+
+    //  Increase editlog operation
+    for (int i = 0; i < 5; i++) {
+      // Add gpu host1
+      toAddAttributes.clear();
+      toAddAttributes.put("host0", ImmutableSet.of(gpu));
+      mgr.removeNodeAttributes(toAddAttributes);
+
+      // Add gpu host1
+      toAddAttributes.clear();
+      toAddAttributes.put("host1", ImmutableSet.of(docker));
+      mgr.addNodeAttributes(toAddAttributes);
+
+      // Remove GPU replace
+      toAddAttributes.clear();
+      toAddAttributes.put("host0", ImmutableSet.of(gpu));
+      mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
+          toAddAttributes);
+
+      // Add fgpa host1
+      toAddAttributes.clear();
+      toAddAttributes.put("host1", ImmutableSet.of(gpu));
+      mgr.addNodeAttributes(toAddAttributes);
+    }
+    mgr.stop();
+
+    // Start new attribute manager with same path
+    mgr = new MockNodeAttrbuteManager();
+    mgr.init(conf);
+    mgr.start();
+
+    Assert.assertEquals("host0 size", 1,
+        mgr.getAttributesForNode("host0").size());
+    Assert.assertEquals("host1 size", 2,
+        mgr.getAttributesForNode("host1").size());
+
+    toAddAttributes.clear();
+    NodeAttribute replaced =
+        NodeAttribute.newInstance("GPU2", NodeAttributeType.STRING, "nvidia2");
+    toAddAttributes.put("host0", ImmutableSet.of(replaced));
+    mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
+        toAddAttributes);
+    mgr.stop();
+
+    mgr = new MockNodeAttrbuteManager();
+    mgr.init(conf);
+    mgr.start();
+    Map<NodeAttribute, AttributeValue> valueMap =
+        mgr.getAttributesForNode("host0");
+    Map.Entry<NodeAttribute, AttributeValue> entry =
+        valueMap.entrySet().iterator().next();
+    NodeAttribute attribute = entry.getKey();
+    Assert.assertEquals("host0 size", 1,
+        mgr.getAttributesForNode("host0").size());
+    Assert.assertEquals("host1 size", 2,
+        mgr.getAttributesForNode("host1").size());
+    checkNodeAttributeEqual(replaced, attribute);
+  }
+
+  public void checkNodeAttributeEqual(NodeAttribute atr1, NodeAttribute atr2) {
+    Assert.assertEquals(atr1.getAttributeType(), atr2.getAttributeType());
+    Assert.assertEquals(atr1.getAttributeName(), atr2.getAttributeName());
+    Assert.assertEquals(atr1.getAttributePrefix(), atr2.getAttributePrefix());
+    Assert.assertEquals(atr1.getAttributeValue(), atr2.getAttributeValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java
index 07968d4..b8c5bc9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java
@@ -23,7 +23,9 @@ import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.NodeAttribute;
 import org.apache.hadoop.yarn.api.records.NodeAttributeType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.nodelabels.AttributeValue;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
 import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
 import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
 import org.junit.Test;
@@ -31,6 +33,7 @@ import org.junit.Before;
 import org.junit.After;
 import org.junit.Assert;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -49,9 +52,17 @@ public class TestNodeAttributesManager {
       new String[] {"host1", "host2", "host3"};
 
   @Before
-  public void init() {
+  public void init() throws IOException {
     Configuration conf = new Configuration();
     attributesManager = new NodeAttributesManagerImpl();
+    conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
+        FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
+    File tempDir = File.createTempFile("nattr", ".tmp");
+    tempDir.delete();
+    tempDir.mkdirs();
+    tempDir.deleteOnExit();
+    conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
+        tempDir.getAbsolutePath());
     attributesManager.init(conf);
     attributesManager.start();
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org