You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/11/20 03:38:44 UTC
git commit: TEZ-628. AMNodeMap should handle NodeReports for unknown
nodes. (sseth)
Updated Branches:
refs/heads/master 808be4d04 -> f9c9c0c9c
TEZ-628. AMNodeMap should handle NodeReports for unknown nodes. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/f9c9c0c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/f9c9c0c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/f9c9c0c9
Branch: refs/heads/master
Commit: f9c9c0c9c49ab973cea7920c26c494e23b788c3a
Parents: 808be4d
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Nov 19 18:38:19 2013 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Nov 19 18:38:19 2013 -0800
----------------------------------------------------------------------
.../apache/tez/dag/app/rm/node/AMNodeMap.java | 9 +++
.../tez/dag/app/rm/node/TestAMNodeMap.java | 85 ++++++++++++++++++++
2 files changed, 94 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9c9c0c9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
index fdeeec8..0336a9e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
@@ -141,6 +141,15 @@ public class AMNodeMap extends AbstractService implements
numClusterNodes = event.getNodeCount();
computeIgnoreBlacklisting();
break;
+ case N_TURNED_UNHEALTHY:
+ case N_TURNED_HEALTHY:
+ AMNode amNode = nodeMap.get(nodeId);
+ if (amNode == null) {
+ LOG.info("Ignoring RM Health Update for unknwon node: " + nodeId);
+ } else {
+ amNode.handle(rEvent);
+ }
+ break;
default:
nodeMap.get(nodeId).handle(rEvent);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9c9c0c9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
new file mode 100644
index 0000000..31304a9
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.app.AppContext;
+import org.junit.Test;
+
+public class TestAMNodeMap {
+
+ @Test
+ @SuppressWarnings({ "resource", "rawtypes" })
+ public void testHealthUpdateKnownNode() {
+
+ DrainDispatcher dispatcher = new DrainDispatcher();
+ dispatcher.init(new Configuration());
+ dispatcher.start();
+ EventHandler eventHandler = dispatcher.getEventHandler();
+
+ AppContext appContext = mock(AppContext.class);
+
+ AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
+
+ NodeId nodeId = NodeId.newInstance("host1", 2342);
+ amNodeMap.nodeSeen(nodeId);
+
+ NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY);
+ amNodeMap.handle(new AMNodeEventStateChanged(nodeReport));
+ dispatcher.await();
+ assertEquals(AMNodeState.UNHEALTHY, amNodeMap.get(nodeId).getState());
+ dispatcher.stop();
+ }
+
+ @Test
+ @SuppressWarnings({ "resource", "rawtypes" })
+ public void testHealthUpdateUnknownNode() {
+ DrainDispatcher dispatcher = new DrainDispatcher();
+ EventHandler eventHandler = dispatcher.getEventHandler();
+
+ AppContext appContext = mock(AppContext.class);
+
+ AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
+
+ NodeId nodeId = NodeId.newInstance("unknownhost", 2342);
+
+ NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY);
+ amNodeMap.handle(new AMNodeEventStateChanged(nodeReport));
+
+ // No exceptions - the status update was ignored. Not bothering to capture
+ // the log message for verification.
+ dispatcher.stop();
+ }
+
+ private static NodeReport generateNodeReport(NodeId nodeId, NodeState nodeState) {
+ NodeReport nodeReport = NodeReport.newInstance(nodeId, nodeState, nodeId.getHost() + ":3433",
+ "/default-rack", Resource.newInstance(0, 0), Resource.newInstance(10240, 12), 10,
+ nodeState.toString(), System.currentTimeMillis());
+ return nodeReport;
+ }
+}