You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/08/25 15:49:54 UTC
[37/50] [abbrv] hadoop git commit: YARN-7875. Node Attribute store
for storing and recovering attributes. Contributed by Bibin A Chundatt.
YARN-7875. Node Attribute store for storing and recovering attributes. Contributed by Bibin A Chundatt.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1c022888
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1c022888
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1c022888
Branch: refs/heads/YARN-3409
Commit: 1c022888c3a19d98925d1a2ef4d263ee3fc67ead
Parents: c535be9
Author: Sunil G <su...@apache.org>
Authored: Fri Apr 6 07:09:27 2018 +0530
Committer: Sunil G <su...@apache.org>
Committed: Sat Aug 25 21:10:57 2018 +0530
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 16 ++
.../yarn/nodelabels/NodeAttributeStore.java | 77 ++++++
.../yarn/nodelabels/NodeAttributesManager.java | 11 +
.../hadoop/yarn/nodelabels/RMNodeAttribute.java | 6 -
.../nodelabels/store/AbstractFSNodeStore.java | 2 +-
.../yarn/nodelabels/store/FSStoreOpHandler.java | 21 +-
.../store/op/AddNodeToAttributeLogOp.java | 71 +++++
.../nodelabels/store/op/FSNodeStoreLogOp.java | 17 ++
.../store/op/NodeAttributeMirrorOp.java | 64 +++++
.../store/op/RemoveNodeToAttributeLogOp.java | 71 +++++
.../store/op/ReplaceNodeToAttributeLogOp.java | 73 ++++++
.../yarn/nodelabels/store/op/package-info.java | 21 ++
.../src/main/resources/yarn-default.xml | 16 ++
.../FileSystemNodeAttributeStore.java | 102 ++++++++
.../nodelabels/NodeAttributesManagerImpl.java | 100 ++++++-
.../TestResourceTrackerService.java | 10 +
.../TestFileSystemNodeAttributeStore.java | 260 +++++++++++++++++++
.../nodelabels/TestNodeAttributesManager.java | 13 +-
18 files changed, 935 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index b331381..eeadd93 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3464,6 +3464,22 @@ public class YarnConfiguration extends Configuration {
+ "fs-store.root-dir";
/**
+ * Node-attribute configurations.
+ */
+ public static final String NODE_ATTRIBUTE_PREFIX =
+ YARN_PREFIX + "node-attribute.";
+ /**
+ * Node attribute store implementation class.
+ */
+ public static final String FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS =
+ NODE_ATTRIBUTE_PREFIX + "fs-store.impl.class";
+ /**
+ * File system not attribute store directory.
+ */
+ public static final String FS_NODE_ATTRIBUTE_STORE_ROOT_DIR =
+ NODE_ATTRIBUTE_PREFIX + "fs-store.root-dir";
+
+ /**
* Flag to indicate if the node labels feature enabled, by default it's
* disabled
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributeStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributeStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributeStore.java
new file mode 100644
index 0000000..8e9f9ff
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributeStore.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Interface class for Node label store.
+ */
+public interface NodeAttributeStore extends Closeable {
+
+ /**
+ * Replace labels on node.
+ *
+ * @param nodeToAttribute node to attribute list.
+ * @throws IOException
+ */
+ void replaceNodeAttributes(List<NodeToAttributes> nodeToAttribute)
+ throws IOException;
+
+ /**
+ * Add attribute to node.
+ *
+ * @param nodeToAttribute node to attribute list.
+ * @throws IOException
+ */
+ void addNodeAttributes(List<NodeToAttributes> nodeToAttribute)
+ throws IOException;
+
+ /**
+ * Remove attribute from node.
+ *
+ * @param nodeToAttribute node to attribute list.
+ * @throws IOException
+ */
+ void removeNodeAttributes(List<NodeToAttributes> nodeToAttribute)
+ throws IOException;
+
+ /**
+ * Initialize based on configuration and NodeAttributesManager.
+ *
+ * @param configuration configuration instance.
+ * @param mgr nodeattributemanager instance.
+ * @throws Exception
+ */
+ void init(Configuration configuration, NodeAttributesManager mgr)
+ throws Exception;
+
+ /**
+ * Recover store on resourcemanager startup.
+ * @throws IOException
+ * @throws YarnException
+ */
+ void recover() throws IOException, YarnException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
index ffa33cf..ec7d30d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
@@ -19,11 +19,13 @@
package org.apache.hadoop.yarn.nodelabels;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
/**
* This class captures all interactions for Attributes with RM.
@@ -101,6 +103,15 @@ public abstract class NodeAttributesManager extends AbstractService {
public abstract Map<NodeAttribute, AttributeValue> getAttributesForNode(
String hostName);
+ /**
+ * Get All node to Attributes list based on filter.
+ *
+ * @return List<NodeToAttributes> nodeToAttributes matching filter.If empty
+ * or null is passed as argument will return all.
+ */
+ public abstract List<NodeToAttributes> getNodeToAttributes(
+ Set<String> prefix);
+
// futuristic
// public set<NodeId> getNodesMatchingExpression(String nodeLabelExp);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/RMNodeAttribute.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/RMNodeAttribute.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/RMNodeAttribute.java
index 5a709c6..3b2bd16 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/RMNodeAttribute.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/RMNodeAttribute.java
@@ -53,12 +53,6 @@ public class RMNodeAttribute extends AbstractLabel {
this.attribute = attribute;
}
- public RMNodeAttribute(String attributeName) {
- super(attributeName);
- attribute = NodeAttribute.newInstance(attributeName,
- NodeAttributeType.STRING, CommonNodeLabelsManager.NO_LABEL);
- }
-
public NodeAttributeType getAttributeType() {
return attribute.getAttributeType();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/AbstractFSNodeStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/AbstractFSNodeStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/AbstractFSNodeStore.java
index a47cacf..216fc79 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/AbstractFSNodeStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/AbstractFSNodeStore.java
@@ -64,7 +64,7 @@ public abstract class AbstractFSNodeStore<M> {
initFileSystem(conf);
// mkdir of root dir path
fs.mkdirs(fsWorkingPath);
-
+ LOG.info("Created store directory :" + fsWorkingPath);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/FSStoreOpHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/FSStoreOpHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/FSStoreOpHandler.java
index 0f7f53d..a626537 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/FSStoreOpHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/FSStoreOpHandler.java
@@ -17,13 +17,17 @@
*/
package org.apache.hadoop.yarn.nodelabels.store;
-import static org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler
- .StoreType.NODE_LABEL_STORE;
+import static org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler.StoreType.NODE_ATTRIBUTE;
+import static org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler.StoreType.NODE_LABEL_STORE;
import org.apache.hadoop.yarn.nodelabels.store.op.AddClusterLabelOp;
+import org.apache.hadoop.yarn.nodelabels.store.op.AddNodeToAttributeLogOp;
import org.apache.hadoop.yarn.nodelabels.store.op.FSNodeStoreLogOp;
+import org.apache.hadoop.yarn.nodelabels.store.op.NodeAttributeMirrorOp;
import org.apache.hadoop.yarn.nodelabels.store.op.NodeLabelMirrorOp;
import org.apache.hadoop.yarn.nodelabels.store.op.NodeToLabelOp;
import org.apache.hadoop.yarn.nodelabels.store.op.RemoveClusterLabelOp;
+import org.apache.hadoop.yarn.nodelabels.store.op.RemoveNodeToAttributeLogOp;
+import org.apache.hadoop.yarn.nodelabels.store.op.ReplaceNodeToAttributeLogOp;
import java.util.HashMap;
import java.util.Map;
@@ -39,7 +43,7 @@ public class FSStoreOpHandler {
public enum StoreType {
NODE_LABEL_STORE,
- NODE_LABEL_ATTRIBUTE;
+ NODE_ATTRIBUTE
}
static {
@@ -47,13 +51,24 @@ public class FSStoreOpHandler {
mirrorOp = new HashMap<>();
// registerLog edit log operation
+
+ //Node Label Operations
registerLog(NODE_LABEL_STORE, AddClusterLabelOp.OPCODE, AddClusterLabelOp.class);
registerLog(NODE_LABEL_STORE, NodeToLabelOp.OPCODE, NodeToLabelOp.class);
registerLog(NODE_LABEL_STORE, RemoveClusterLabelOp.OPCODE, RemoveClusterLabelOp.class);
+ //NodeAttibute operation
+ registerLog(NODE_ATTRIBUTE, AddNodeToAttributeLogOp.OPCODE, AddNodeToAttributeLogOp.class);
+ registerLog(NODE_ATTRIBUTE, RemoveNodeToAttributeLogOp.OPCODE, RemoveNodeToAttributeLogOp.class);
+ registerLog(NODE_ATTRIBUTE, ReplaceNodeToAttributeLogOp.OPCODE, ReplaceNodeToAttributeLogOp.class);
+
// registerLog Mirror op
+ // Node label mirror operation
registerMirror(NODE_LABEL_STORE, NodeLabelMirrorOp.class);
+ //Node attribute mirror operation
+ registerMirror(NODE_ATTRIBUTE, NodeAttributeMirrorOp.class);
+
}
private static void registerMirror(StoreType type,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/AddNodeToAttributeLogOp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/AddNodeToAttributeLogOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/AddNodeToAttributeLogOp.java
new file mode 100644
index 0000000..4b92bcf
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/AddNodeToAttributeLogOp.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.nodelabels.store.op;
+
+import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodesToAttributesMappingRequestPBImpl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+/**
+ * File system Add Node to attribute mapping.
+ */
+public class AddNodeToAttributeLogOp
+ extends FSNodeStoreLogOp<NodeAttributesManager> {
+
+ private List<NodeToAttributes> attributes;
+
+ public static final int OPCODE = 0;
+
+ @Override
+ public void write(OutputStream os, NodeAttributesManager mgr)
+ throws IOException {
+ ((NodesToAttributesMappingRequestPBImpl) NodesToAttributesMappingRequest
+ .newInstance(AttributeMappingOperationType.ADD, attributes, false))
+ .getProto().writeDelimitedTo(os);
+ }
+
+ @Override
+ public void recover(InputStream is, NodeAttributesManager mgr)
+ throws IOException {
+ NodesToAttributesMappingRequest request =
+ new NodesToAttributesMappingRequestPBImpl(
+ YarnServerResourceManagerServiceProtos
+ .NodesToAttributesMappingRequestProto
+ .parseDelimitedFrom(is));
+ mgr.addNodeAttributes(getNodeToAttributesMap(request));
+ }
+
+ public AddNodeToAttributeLogOp setAttributes(
+ List<NodeToAttributes> attributesList) {
+ this.attributes = attributesList;
+ return this;
+ }
+
+ @Override
+ public int getOpCode() {
+ return OPCODE;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/FSNodeStoreLogOp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/FSNodeStoreLogOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/FSNodeStoreLogOp.java
index cd739c0..bf4d1b9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/FSNodeStoreLogOp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/FSNodeStoreLogOp.java
@@ -17,10 +17,18 @@
*/
package org.apache.hadoop.yarn.nodelabels.store.op;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.nodelabels.store.StoreOp;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* Defines all FileSystem editlog operation. All node label and attribute
@@ -32,4 +40,13 @@ public abstract class FSNodeStoreLogOp<M>
implements StoreOp<OutputStream, InputStream, M> {
public abstract int getOpCode();
+
+ protected Map<String, Set<NodeAttribute>> getNodeToAttributesMap(
+ NodesToAttributesMappingRequest request) {
+ List<NodeToAttributes> attributes = request.getNodesToAttributes();
+ Map<String, Set<NodeAttribute>> nodeToAttrMap = new HashMap<>();
+ attributes.forEach((v) -> nodeToAttrMap
+ .put(v.getNode(), new HashSet<>(v.getNodeAttributes())));
+ return nodeToAttrMap;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/NodeAttributeMirrorOp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/NodeAttributeMirrorOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/NodeAttributeMirrorOp.java
new file mode 100644
index 0000000..dca0555
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/NodeAttributeMirrorOp.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.nodelabels.store.op;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodesToAttributesMappingRequestPBImpl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * File System Node Attribute Mirror read and write operation.
+ */
+public class NodeAttributeMirrorOp
+ extends FSNodeStoreLogOp<NodeAttributesManager> {
+
+ @Override
+ public void write(OutputStream os, NodeAttributesManager mgr)
+ throws IOException {
+ ((NodesToAttributesMappingRequestPBImpl) NodesToAttributesMappingRequest
+ .newInstance(AttributeMappingOperationType.REPLACE,
+ mgr.getNodeToAttributes(
+ ImmutableSet.of(NodeAttribute.PREFIX_CENTRALIZED)), false))
+ .getProto().writeDelimitedTo(os);
+ }
+
+ @Override
+ public void recover(InputStream is, NodeAttributesManager mgr)
+ throws IOException {
+ NodesToAttributesMappingRequest request =
+ new NodesToAttributesMappingRequestPBImpl(
+ YarnServerResourceManagerServiceProtos
+ .NodesToAttributesMappingRequestProto
+ .parseDelimitedFrom(is));
+ mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
+ getNodeToAttributesMap(request));
+ }
+
+ @Override
+ public int getOpCode() {
+ return -1;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/RemoveNodeToAttributeLogOp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/RemoveNodeToAttributeLogOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/RemoveNodeToAttributeLogOp.java
new file mode 100644
index 0000000..1d13077
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/RemoveNodeToAttributeLogOp.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.nodelabels.store.op;
+
+import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodesToAttributesMappingRequestPBImpl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+/**
+ * File system remove node attribute from node operation.
+ */
+public class RemoveNodeToAttributeLogOp
+ extends FSNodeStoreLogOp<NodeAttributesManager> {
+
+ private List<NodeToAttributes> attributes;
+
+ public static final int OPCODE = 1;
+
+ @Override
+ public void write(OutputStream os, NodeAttributesManager mgr)
+ throws IOException {
+ ((NodesToAttributesMappingRequestPBImpl) NodesToAttributesMappingRequest
+ .newInstance(AttributeMappingOperationType.REMOVE, attributes, false))
+ .getProto().writeDelimitedTo(os);
+ }
+
+ @Override
+ public void recover(InputStream is, NodeAttributesManager mgr)
+ throws IOException {
+ NodesToAttributesMappingRequest request =
+ new NodesToAttributesMappingRequestPBImpl(
+ YarnServerResourceManagerServiceProtos
+ .NodesToAttributesMappingRequestProto
+ .parseDelimitedFrom(is));
+ mgr.removeNodeAttributes(getNodeToAttributesMap(request));
+ }
+
+ public RemoveNodeToAttributeLogOp setAttributes(
+ List<NodeToAttributes> attrs) {
+ this.attributes = attrs;
+ return this;
+ }
+
+ @Override
+ public int getOpCode() {
+ return OPCODE;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/ReplaceNodeToAttributeLogOp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/ReplaceNodeToAttributeLogOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/ReplaceNodeToAttributeLogOp.java
new file mode 100644
index 0000000..54d7651
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/ReplaceNodeToAttributeLogOp.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.nodelabels.store.op;
+
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodesToAttributesMappingRequestPBImpl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+/**
+ * File system replace node attribute from node operation.
+ */
+public class ReplaceNodeToAttributeLogOp
+ extends FSNodeStoreLogOp<NodeAttributesManager> {
+
+ private List<NodeToAttributes> attributes;
+ public static final int OPCODE = 2;
+
+ @Override
+ public void write(OutputStream os, NodeAttributesManager mgr)
+ throws IOException {
+ ((NodesToAttributesMappingRequestPBImpl) NodesToAttributesMappingRequest
+ .newInstance(AttributeMappingOperationType.REPLACE, attributes, false))
+ .getProto().writeDelimitedTo(os);
+ }
+
+ @Override
+ public void recover(InputStream is, NodeAttributesManager mgr)
+ throws IOException {
+ NodesToAttributesMappingRequest request =
+ new NodesToAttributesMappingRequestPBImpl(
+ YarnServerResourceManagerServiceProtos
+ .NodesToAttributesMappingRequestProto
+ .parseDelimitedFrom(is));
+ //Only CENTRALIZED is stored to FS system
+ mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
+ getNodeToAttributesMap(request));
+ }
+
+ public ReplaceNodeToAttributeLogOp setAttributes(
+ List<NodeToAttributes> attrs) {
+ this.attributes = attrs;
+ return this;
+ }
+
+ @Override
+ public int getOpCode() {
+ return OPCODE;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/package-info.java
new file mode 100644
index 0000000..f6fb3d3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.yarn.nodelabels.store.op;
+import org.apache.hadoop.classification.InterfaceAudience;
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index e6d708f..b74fccd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3950,4 +3950,20 @@
<name>yarn.nodemanager.elastic-memory-control.timeout-sec</name>
<value>5</value>
</property>
+ <property>
+ <description>
+ URI for NodeAttributeManager. The default value is
+ /tmp/hadoop-yarn-${user}/node-attribute/ in the local filesystem.
+ </description>
+ <name>yarn.node-attribute.fs-store.root-dir</name>
+ <value></value>
+ </property>
+
+ <property>
+ <description>
+ Choose different implementation of node attribute's storage
+ </description>
+ <name>yarn.node-attribute.fs-store.impl.class</name>
+ <value>org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore</value>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/FileSystemNodeAttributeStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/FileSystemNodeAttributeStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/FileSystemNodeAttributeStore.java
new file mode 100644
index 0000000..01df250
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/FileSystemNodeAttributeStore.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
+import org.apache.hadoop.yarn.nodelabels.store.AbstractFSNodeStore;
+import org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler;
+import org.apache.hadoop.yarn.nodelabels.store.op.AddNodeToAttributeLogOp;
+import org.apache.hadoop.yarn.nodelabels.store.op.RemoveNodeToAttributeLogOp;
+import org.apache.hadoop.yarn.nodelabels.store.op.ReplaceNodeToAttributeLogOp;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * File system node attribute implementation.
+ */
+public class FileSystemNodeAttributeStore
+ extends AbstractFSNodeStore<NodeAttributesManager>
+ implements NodeAttributeStore {
+
+ protected static final Log LOG =
+ LogFactory.getLog(FileSystemNodeAttributeStore.class);
+
+ protected static final String DEFAULT_DIR_NAME = "node-attribute";
+ protected static final String MIRROR_FILENAME = "nodeattribute.mirror";
+ protected static final String EDITLOG_FILENAME = "nodeattribute.editlog";
+
+ public FileSystemNodeAttributeStore() {
+ super(FSStoreOpHandler.StoreType.NODE_ATTRIBUTE);
+ }
+
+ private String getDefaultFSNodeAttributeRootDir() throws IOException {
+ // default is in local: /tmp/hadoop-yarn-${user}/node-attribute/
+ return "file:///tmp/hadoop-yarn-" + UserGroupInformation.getCurrentUser()
+ .getShortUserName() + "/" + DEFAULT_DIR_NAME;
+ }
+
+ @Override
+ public void init(Configuration conf, NodeAttributesManager mgr)
+ throws Exception {
+ StoreSchema schema = new StoreSchema(EDITLOG_FILENAME, MIRROR_FILENAME);
+ initStore(conf, new Path(
+ conf.get(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
+ getDefaultFSNodeAttributeRootDir())), schema, mgr);
+ }
+
+ @Override
+ public void replaceNodeAttributes(List<NodeToAttributes> nodeToAttribute)
+ throws IOException {
+ ReplaceNodeToAttributeLogOp op = new ReplaceNodeToAttributeLogOp();
+ writeToLog(op.setAttributes(nodeToAttribute));
+ }
+
+ @Override
+ public void addNodeAttributes(List<NodeToAttributes> nodeAttributeMapping)
+ throws IOException {
+ AddNodeToAttributeLogOp op = new AddNodeToAttributeLogOp();
+ writeToLog(op.setAttributes(nodeAttributeMapping));
+ }
+
+ @Override
+ public void removeNodeAttributes(List<NodeToAttributes> nodeAttributeMapping)
+ throws IOException {
+ RemoveNodeToAttributeLogOp op = new RemoveNodeToAttributeLogOp();
+ writeToLog(op.setAttributes(nodeAttributeMapping));
+ }
+
+ @Override
+ public void recover() throws IOException, YarnException {
+ super.recoverFromStore();
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.closeFSStore();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
index 04d74a8..b4686e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
@@ -32,24 +32,31 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.ArrayList;
+import java.util.List;
import com.google.common.base.Strings;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.nodelabels.AttributeValue;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
import org.apache.hadoop.yarn.nodelabels.RMNodeAttribute;
import org.apache.hadoop.yarn.nodelabels.StringAttributeValue;
import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
/**
* Manager holding the attributes to Labels.
@@ -63,7 +70,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
*/
public static final String EMPTY_ATTRIBUTE_VALUE = "";
- private Dispatcher dispatcher;
+ Dispatcher dispatcher;
+ NodeAttributeStore store;
// TODO may be we can have a better collection here.
// this will be updated to get the attributeName to NM mapping
@@ -121,7 +129,21 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
}
protected void initNodeAttributeStore(Configuration conf) throws Exception {
- // TODO to generalize and make use of the FileSystemNodeLabelsStore
+ this.store =getAttributeStoreClass(conf);
+ this.store.init(conf, this);
+ this.store.recover();
+ }
+
+ private NodeAttributeStore getAttributeStoreClass(Configuration conf) {
+ try {
+ return ReflectionUtils.newInstance(
+ conf.getClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
+ FileSystemNodeAttributeStore.class, NodeAttributeStore.class),
+ conf);
+ } catch (Exception e) {
+ throw new YarnRuntimeException(
+ "Could not instantiate Node Attribute Store ", e);
+ }
}
private void internalUpdateAttributesOnNodes(
@@ -174,7 +196,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
LOG.info(logMsg);
- if (null != dispatcher) {
+ if (null != dispatcher && NodeAttribute.PREFIX_CENTRALIZED
+ .equals(attributePrefix)) {
dispatcher.getEventHandler()
.handle(new NodeAttributesStoreEvent(nodeAttributeMapping, op));
}
@@ -382,6 +405,32 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
}
}
+ @Override
+ public List<NodeToAttributes> getNodeToAttributes(Set<String> prefix) {
+ try {
+ readLock.lock();
+ List<NodeToAttributes> nodeToAttributes = new ArrayList<>();
+ nodeCollections.forEach((k, v) -> {
+ List<NodeAttribute> attrs;
+ if (prefix == null || prefix.isEmpty()) {
+ attrs = new ArrayList<>(v.getAttributes().keySet());
+ } else {
+ attrs = new ArrayList<>();
+ for (Entry<NodeAttribute, AttributeValue> nodeAttr : v.attributes
+ .entrySet()) {
+ if (prefix.contains(nodeAttr.getKey().getAttributePrefix())) {
+ attrs.add(nodeAttr.getKey());
+ }
+ }
+ }
+ nodeToAttributes.add(NodeToAttributes.newInstance(k, attrs));
+ });
+ return nodeToAttributes;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
public void activateNode(NodeId nodeId, Resource resource) {
try {
writeLock.lock();
@@ -524,7 +573,29 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
// Dispatcher related code
protected void handleStoreEvent(NodeAttributesStoreEvent event) {
- // TODO Need to extend the File
+ List<NodeToAttributes> mappingList = new ArrayList<>();
+ Map<String, Map<NodeAttribute, AttributeValue>> nodeToAttr =
+ event.getNodeAttributeMappingList();
+ nodeToAttr.forEach((k, v) -> mappingList
+ .add(NodeToAttributes.newInstance(k, new ArrayList<>(v.keySet()))));
+ try {
+ switch (event.getOperation()) {
+ case REPLACE:
+ store.replaceNodeAttributes(mappingList);
+ break;
+ case ADD:
+ store.addNodeAttributes(mappingList);
+ break;
+ case REMOVE:
+ store.removeNodeAttributes(mappingList);
+ break;
+ default:
+ LOG.warn("Unsupported operation");
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to store attribute modification to storage");
+ throw new YarnRuntimeException(e);
+ }
}
@Override
@@ -549,7 +620,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
private void processMapping(
Map<String, Set<NodeAttribute>> nodeAttributeMapping,
AttributeMappingOperationType mappingType) throws IOException {
- processMapping(nodeAttributeMapping, mappingType, null);
+ processMapping(nodeAttributeMapping, mappingType,
+ NodeAttribute.PREFIX_CENTRALIZED);
}
private void processMapping(
@@ -564,4 +636,22 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
internalUpdateAttributesOnNodes(validMapping, mappingType,
newAttributesToBeAdded, attributePrefix);
}
+
+ protected void stopDispatcher() {
+ AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
+ if (null != asyncDispatcher) {
+ asyncDispatcher.stop();
+ }
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ // finalize store
+ stopDispatcher();
+
+ // only close store when we enabled store persistent
+ if (null != store) {
+ store.close();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index a29e8a2..adb7fe0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -828,6 +830,14 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
+ conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
+ FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
+ File tempDir = File.createTempFile("nattr", ".tmp");
+ tempDir.delete();
+ tempDir.mkdirs();
+ tempDir.deleteOnExit();
+ conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
+ tempDir.getAbsolutePath());
rm = new MockRM(conf);
rm.start();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestFileSystemNodeAttributeStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestFileSystemNodeAttributeStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestFileSystemNodeAttributeStore.java
new file mode 100644
index 0000000..e2ee8b4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestFileSystemNodeAttributeStore.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
+import org.apache.hadoop.yarn.nodelabels.AttributeValue;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class TestFileSystemNodeAttributeStore {
+
+ private MockNodeAttrbuteManager mgr = null;
+ private Configuration conf = null;
+
+ private static class MockNodeAttrbuteManager
+ extends NodeAttributesManagerImpl {
+ @Override
+ protected void initDispatcher(Configuration conf) {
+ super.dispatcher = new InlineDispatcher();
+ }
+
+ @Override
+ protected void startDispatcher() {
+ //Do nothing
+ }
+
+ @Override
+ protected void stopDispatcher() {
+ //Do nothing
+ }
+ }
+
+ @Before
+ public void before() throws IOException {
+ mgr = new MockNodeAttrbuteManager();
+ conf = new Configuration();
+ conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
+ FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
+ File tempDir = File.createTempFile("nattr", ".tmp");
+ tempDir.delete();
+ tempDir.mkdirs();
+ tempDir.deleteOnExit();
+ conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
+ tempDir.getAbsolutePath());
+ mgr.init(conf);
+ mgr.start();
+ }
+
+ @After
+ public void after() throws IOException {
+ FileSystemNodeAttributeStore fsStore =
+ ((FileSystemNodeAttributeStore) mgr.store);
+ fsStore.getFs().delete(fsStore.getFsWorkingPath(), true);
+ mgr.stop();
+ }
+
+ @Test(timeout = 10000)
+ public void testRecoverWithMirror() throws Exception {
+
+ //------host0----
+ // add -GPU & FPGA
+ // remove -GPU
+ // replace -Docker
+ //------host1----
+ // add--GPU
+ NodeAttribute docker = NodeAttribute
+ .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "DOCKER",
+ NodeAttributeType.STRING, "docker-0");
+ NodeAttribute gpu = NodeAttribute
+ .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
+ NodeAttributeType.STRING, "nvidia");
+ NodeAttribute fpga = NodeAttribute
+ .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "FPGA",
+ NodeAttributeType.STRING, "asus");
+
+ Map<String, Set<NodeAttribute>> toAddAttributes = new HashMap<>();
+ toAddAttributes.put("host0", ImmutableSet.of(gpu, fpga));
+ toAddAttributes.put("host1", ImmutableSet.of(gpu));
+ // Add node attribute
+ mgr.addNodeAttributes(toAddAttributes);
+
+ Assert.assertEquals("host0 size", 2,
+ mgr.getAttributesForNode("host0").size());
+ // Add test to remove
+ toAddAttributes.clear();
+ toAddAttributes.put("host0", ImmutableSet.of(gpu));
+ mgr.removeNodeAttributes(toAddAttributes);
+
+ // replace nodeattribute
+ toAddAttributes.clear();
+ toAddAttributes.put("host0", ImmutableSet.of(docker));
+ mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
+ toAddAttributes);
+ Map<NodeAttribute, AttributeValue> attrs =
+ mgr.getAttributesForNode("host0");
+ Assert.assertEquals(attrs.size(), 1);
+ Assert.assertEquals(attrs.keySet().toArray()[0], docker);
+ mgr.stop();
+
+ // Start new attribute manager with same path
+ mgr = new MockNodeAttrbuteManager();
+ mgr.init(conf);
+ mgr.start();
+
+ mgr.getAttributesForNode("host0");
+ Assert.assertEquals("host0 size", 1,
+ mgr.getAttributesForNode("host0").size());
+ Assert.assertEquals("host1 size", 1,
+ mgr.getAttributesForNode("host1").size());
+ attrs = mgr.getAttributesForNode("host0");
+ Assert.assertEquals(attrs.size(), 1);
+ Assert.assertEquals(attrs.keySet().toArray()[0], docker);
+ //------host0----
+ // current - docker
+ // replace - gpu
+ //----- host1----
+ // current - gpu
+ // add - docker
+ toAddAttributes.clear();
+ toAddAttributes.put("host0", ImmutableSet.of(gpu));
+ mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
+ toAddAttributes);
+
+ toAddAttributes.clear();
+ toAddAttributes.put("host1", ImmutableSet.of(docker));
+ mgr.addNodeAttributes(toAddAttributes);
+ // Recover from mirror and edit log
+ mgr.stop();
+
+ mgr = new MockNodeAttrbuteManager();
+ mgr.init(conf);
+ mgr.start();
+ Assert.assertEquals("host0 size", 1,
+ mgr.getAttributesForNode("host0").size());
+ Assert.assertEquals("host1 size", 2,
+ mgr.getAttributesForNode("host1").size());
+ attrs = mgr.getAttributesForNode("host0");
+ Assert.assertEquals(attrs.size(), 1);
+ Assert.assertEquals(attrs.keySet().toArray()[0], gpu);
+ attrs = mgr.getAttributesForNode("host1");
+ Assert.assertTrue(attrs.keySet().contains(docker));
+ Assert.assertTrue(attrs.keySet().contains(gpu));
+ }
+
+ @Test(timeout = 10000)
+ public void testRecoverFromEditLog() throws Exception {
+ NodeAttribute docker = NodeAttribute
+ .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "DOCKER",
+ NodeAttributeType.STRING, "docker-0");
+ NodeAttribute gpu = NodeAttribute
+ .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
+ NodeAttributeType.STRING, "nvidia");
+ NodeAttribute fpga = NodeAttribute
+ .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "FPGA",
+ NodeAttributeType.STRING, "asus");
+
+ Map<String, Set<NodeAttribute>> toAddAttributes = new HashMap<>();
+ toAddAttributes.put("host0", ImmutableSet.of(gpu, fpga));
+ toAddAttributes.put("host1", ImmutableSet.of(docker));
+
+ // Add node attribute
+ mgr.addNodeAttributes(toAddAttributes);
+
+ Assert.assertEquals("host0 size", 2,
+ mgr.getAttributesForNode("host0").size());
+
+ // Increase editlog operation
+ for (int i = 0; i < 5; i++) {
+ // Add gpu host1
+ toAddAttributes.clear();
+ toAddAttributes.put("host0", ImmutableSet.of(gpu));
+ mgr.removeNodeAttributes(toAddAttributes);
+
+ // Add gpu host1
+ toAddAttributes.clear();
+ toAddAttributes.put("host1", ImmutableSet.of(docker));
+ mgr.addNodeAttributes(toAddAttributes);
+
+ // Remove GPU replace
+ toAddAttributes.clear();
+ toAddAttributes.put("host0", ImmutableSet.of(gpu));
+ mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
+ toAddAttributes);
+
+ // Add fgpa host1
+ toAddAttributes.clear();
+ toAddAttributes.put("host1", ImmutableSet.of(gpu));
+ mgr.addNodeAttributes(toAddAttributes);
+ }
+ mgr.stop();
+
+ // Start new attribute manager with same path
+ mgr = new MockNodeAttrbuteManager();
+ mgr.init(conf);
+ mgr.start();
+
+ Assert.assertEquals("host0 size", 1,
+ mgr.getAttributesForNode("host0").size());
+ Assert.assertEquals("host1 size", 2,
+ mgr.getAttributesForNode("host1").size());
+
+ toAddAttributes.clear();
+ NodeAttribute replaced =
+ NodeAttribute.newInstance("GPU2", NodeAttributeType.STRING, "nvidia2");
+ toAddAttributes.put("host0", ImmutableSet.of(replaced));
+ mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
+ toAddAttributes);
+ mgr.stop();
+
+ mgr = new MockNodeAttrbuteManager();
+ mgr.init(conf);
+ mgr.start();
+ Map<NodeAttribute, AttributeValue> valueMap =
+ mgr.getAttributesForNode("host0");
+ Map.Entry<NodeAttribute, AttributeValue> entry =
+ valueMap.entrySet().iterator().next();
+ NodeAttribute attribute = entry.getKey();
+ Assert.assertEquals("host0 size", 1,
+ mgr.getAttributesForNode("host0").size());
+ Assert.assertEquals("host1 size", 2,
+ mgr.getAttributesForNode("host1").size());
+ checkNodeAttributeEqual(replaced, attribute);
+ }
+
+ public void checkNodeAttributeEqual(NodeAttribute atr1, NodeAttribute atr2) {
+ Assert.assertEquals(atr1.getAttributeType(), atr2.getAttributeType());
+ Assert.assertEquals(atr1.getAttributeName(), atr2.getAttributeName());
+ Assert.assertEquals(atr1.getAttributePrefix(), atr2.getAttributePrefix());
+ Assert.assertEquals(atr1.getAttributeValue(), atr2.getAttributeValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c022888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java
index 07968d4..b8c5bc9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java
@@ -23,7 +23,9 @@ import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.AttributeValue;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
import org.junit.Test;
@@ -31,6 +33,7 @@ import org.junit.Before;
import org.junit.After;
import org.junit.Assert;
+import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
@@ -49,9 +52,17 @@ public class TestNodeAttributesManager {
new String[] {"host1", "host2", "host3"};
@Before
- public void init() {
+ public void init() throws IOException {
Configuration conf = new Configuration();
attributesManager = new NodeAttributesManagerImpl();
+ conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
+ FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
+ File tempDir = File.createTempFile("nattr", ".tmp");
+ tempDir.delete();
+ tempDir.mkdirs();
+ tempDir.deleteOnExit();
+ conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
+ tempDir.getAbsolutePath());
attributesManager.init(conf);
attributesManager.start();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org