You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2015/02/20 16:10:58 UTC

hadoop git commit: YARN-3194. RM should handle NMContainerStatuses sent by NM while registering if NM is Reconnected node. Contributed by Rohith

Repository: hadoop
Updated Branches:
  refs/heads/trunk 7ae5255a1 -> a64dd3d24


YARN-3194. RM should handle NMContainerStatuses sent by NM while registering if NM is Reconnected node. Contributed by Rohith


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a64dd3d2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a64dd3d2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a64dd3d2

Branch: refs/heads/trunk
Commit: a64dd3d24bfcb9af21eb63869924f6482b147fd3
Parents: 7ae5255
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Feb 20 15:08:48 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Feb 20 15:10:10 2015 +0000

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../resourcemanager/ResourceTrackerService.java |   9 +-
 .../resourcemanager/rmnode/RMNodeImpl.java      | 111 ++++++++++-------
 .../rmnode/RMNodeReconnectEvent.java            |   9 +-
 .../resourcemanager/TestApplicationCleanup.java | 121 +++++++++++++++++++
 .../resourcemanager/TestRMNodeTransitions.java  |   4 +-
 6 files changed, 209 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a64dd3d2/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index cac6680..8ec2409 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -626,6 +626,9 @@ Release 2.7.0 - UNRELEASED
     YARN-933. Fixed InvalidStateTransitonException at FINAL_SAVING state in
     RMApp. (Rohith Sharmaks via jianhe)
 
+    YARN-3194. RM should handle NMContainerStatuses sent by NM while
+    registering if NM is Reconnected node (Rohith via jlowe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a64dd3d2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 61a0349..0de556b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -312,9 +312,12 @@ public class ResourceTrackerService extends AbstractService implements
     } else {
       LOG.info("Reconnect from the node at: " + host);
       this.nmLivelinessMonitor.unregister(nodeId);
-      this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMNodeReconnectEvent(nodeId, rmNode,
-              request.getRunningApplications()));
+      this.rmContext
+          .getDispatcher()
+          .getEventHandler()
+          .handle(
+              new RMNodeReconnectEvent(nodeId, rmNode, request
+                  .getRunningApplications(), request.getNMContainerStatuses()));
     }
     // On every node manager register we will be clearing NMToken keys if
     // present for any running application.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a64dd3d2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 1bc98b2..9701775 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -601,6 +601,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         rmNode.httpAddress = newNode.getHttpAddress();
         rmNode.totalCapability = newNode.getTotalCapability();
       
+        handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode);
+
         // Reset heartbeat ID since node just restarted.
         rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
       }
@@ -622,6 +624,26 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       }
       
     }
+
+    private void handleNMContainerStatus(
+        List<NMContainerStatus> nmContainerStatuses, RMNodeImpl rmnode) {
+      List<ContainerStatus> containerStatuses =
+          new ArrayList<ContainerStatus>();
+      for (NMContainerStatus nmContainerStatus : nmContainerStatuses) {
+        containerStatuses.add(createContainerStatus(nmContainerStatus));
+      }
+      rmnode.handleContainerStatus(containerStatuses);
+    }
+
+    private ContainerStatus createContainerStatus(
+        NMContainerStatus remoteContainer) {
+      ContainerStatus cStatus =
+          ContainerStatus.newInstance(remoteContainer.getContainerId(),
+              remoteContainer.getContainerState(),
+              remoteContainer.getDiagnostics(),
+              remoteContainer.getContainerExitStatus());
+      return cStatus;
+    }
   }
   
   public static class UpdateNodeResourceWhenRunningTransition
@@ -747,49 +769,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         return NodeState.UNHEALTHY;
       }
 
-      // Filter the map to only obtain just launched containers and finished
-      // containers.
-      List<ContainerStatus> newlyLaunchedContainers = 
-          new ArrayList<ContainerStatus>();
-      List<ContainerStatus> completedContainers = 
-          new ArrayList<ContainerStatus>();
-      for (ContainerStatus remoteContainer : statusEvent.getContainers()) {
-        ContainerId containerId = remoteContainer.getContainerId();
-
-        // Don't bother with containers already scheduled for cleanup, or for
-        // applications already killed. The scheduler doens't need to know any
-        // more about this container
-        if (rmNode.containersToClean.contains(containerId)) {
-          LOG.info("Container " + containerId + " already scheduled for " +
-          		"cleanup, no further processing");
-          continue;
-        }
-        if (rmNode.finishedApplications.contains(containerId
-            .getApplicationAttemptId().getApplicationId())) {
-          LOG.info("Container " + containerId
-              + " belongs to an application that is already killed,"
-              + " no further processing");
-          continue;
-        }
+      rmNode.handleContainerStatus(statusEvent.getContainers());
 
-        // Process running containers
-        if (remoteContainer.getState() == ContainerState.RUNNING) {
-          if (!rmNode.launchedContainers.contains(containerId)) {
-            // Just launched container. RM knows about it the first time.
-            rmNode.launchedContainers.add(containerId);
-            newlyLaunchedContainers.add(remoteContainer);
-          }
-        } else {
-          // A finished container
-          rmNode.launchedContainers.remove(containerId);
-          completedContainers.add(remoteContainer);
-        }
-      }
-      if(newlyLaunchedContainers.size() != 0 
-          || completedContainers.size() != 0) {
-        rmNode.nodeUpdateQueue.add(new UpdatedContainerInfo
-            (newlyLaunchedContainers, completedContainers));
-      }
       if(rmNode.nextHeartBeat) {
         rmNode.nextHeartBeat = false;
         rmNode.context.getDispatcher().getEventHandler().handle(
@@ -874,4 +855,50 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     }
     return nlm.getLabelsOnNode(nodeId);
   }
+
+  private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
+    // Filter the map to only obtain just launched containers and finished
+    // containers.
+    List<ContainerStatus> newlyLaunchedContainers =
+        new ArrayList<ContainerStatus>();
+    List<ContainerStatus> completedContainers =
+        new ArrayList<ContainerStatus>();
+    for (ContainerStatus remoteContainer : containerStatuses) {
+      ContainerId containerId = remoteContainer.getContainerId();
+
+      // Don't bother with containers already scheduled for cleanup, or for
+      // applications already killed. The scheduler doens't need to know any
+      // more about this container
+      if (containersToClean.contains(containerId)) {
+        LOG.info("Container " + containerId + " already scheduled for "
+            + "cleanup, no further processing");
+        continue;
+      }
+      if (finishedApplications.contains(containerId.getApplicationAttemptId()
+          .getApplicationId())) {
+        LOG.info("Container " + containerId
+            + " belongs to an application that is already killed,"
+            + " no further processing");
+        continue;
+      }
+
+      // Process running containers
+      if (remoteContainer.getState() == ContainerState.RUNNING) {
+        if (!launchedContainers.contains(containerId)) {
+          // Just launched container. RM knows about it the first time.
+          launchedContainers.add(containerId);
+          newlyLaunchedContainers.add(remoteContainer);
+        }
+      } else {
+        // A finished container
+        launchedContainers.remove(containerId);
+        completedContainers.add(remoteContainer);
+      }
+    }
+    if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {
+      nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers,
+          completedContainers));
+    }
+  }
+
  }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a64dd3d2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
index ebbac9a..0bea44b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
@@ -22,16 +22,19 @@ import java.util.List;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 
 public class RMNodeReconnectEvent extends RMNodeEvent {
   private RMNode reconnectedNode;
   private List<ApplicationId> runningApplications;
+  private List<NMContainerStatus> containerStatuses;
 
   public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode,
-      List<ApplicationId> runningApps) {
+      List<ApplicationId> runningApps, List<NMContainerStatus> containerReports) {
     super(nodeId, RMNodeEventType.RECONNECTED);
     reconnectedNode = newNode;
     runningApplications = runningApps;
+    containerStatuses = containerReports;
   }
 
   public RMNode getReconnectedNode() {
@@ -41,4 +44,8 @@ public class RMNodeReconnectEvent extends RMNodeEvent {
   public List<ApplicationId> getRunningApplications() {
     return runningApplications;
   }
+
+  public List<NMContainerStatus> getNMContainerStatuses() {
+    return containerStatuses;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a64dd3d2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
index 891130f..6e08aeb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
@@ -28,7 +28,9 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -48,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.log4j.Level;
@@ -478,6 +481,124 @@ public class TestApplicationCleanup {
     rm1.stop();
   }
 
+  // The test verifies processing of NMContainerStatuses which are sent during
+  // NM registration.
+  // 1. Start the cluster-RM,NM,Submit app with 1024MB,Launch & register AM
+  // 2. AM sends ResourceRequest for 1 container with memory 2048MB.
+  // 3. Verify for number of container allocated by RM
+  // 4. Verify Memory Usage by cluster, it should be 3072. AM memory + requested
+  // memory. 1024 + 2048=3072
+  // 5. Re-register NM by sending completed container status
+  // 6. Verify for Memory Used, it should be 1024
+  // 7. Send AM heatbeat to RM. Allocated response should contain completed
+  // container.
+  @Test(timeout = 60000)
+  public void testProcessingNMContainerStatusesOnNMRestart() throws Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // 1. Start the cluster-RM,NM,Submit app with 1024MB,Launch & register AM
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    int nmMemory = 8192;
+    int amMemory = 1024;
+    int containerMemory = 2048;
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", nmMemory, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    RMApp app0 = rm1.submitApp(amMemory);
+    MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
+
+    // 2. AM sends ResourceRequest for 1 container with memory 2048MB.
+    int noOfContainers = 1;
+    List<Container> allocateContainers =
+        am0.allocateAndWaitForContainers(noOfContainers, containerMemory, nm1);
+
+    // 3. Verify for number of container allocated by RM
+    Assert.assertEquals(noOfContainers, allocateContainers.size());
+    Container container = allocateContainers.get(0);
+
+    nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING);
+    nm1.nodeHeartbeat(am0.getApplicationAttemptId(), container.getId()
+        .getContainerId(), ContainerState.RUNNING);
+
+    rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
+
+    // 4. Verify Memory Usage by cluster, it should be 3072. AM memory +
+    // requested memory. 1024 + 2048=3072
+    ResourceScheduler rs = rm1.getRMContext().getScheduler();
+    int allocatedMB = rs.getRootQueueMetrics().getAllocatedMB();
+    Assert.assertEquals(amMemory + containerMemory, allocatedMB);
+
+    // 5. Re-register NM by sending completed container status
+    List<NMContainerStatus> nMContainerStatusForApp =
+        createNMContainerStatusForApp(am0);
+    nm1.registerNode(nMContainerStatusForApp,
+        Arrays.asList(app0.getApplicationId()));
+
+    waitForClusterMemory(nm1, rs, amMemory);
+
+    // 6. Verify for Memory Used, it should be 1024
+    Assert.assertEquals(amMemory, rs.getRootQueueMetrics().getAllocatedMB());
+
+    // 7. Send AM heatbeat to RM. Allocated response should contain completed
+    // container
+    AllocateRequest req =
+        AllocateRequest.newInstance(0, 0F, new ArrayList<ResourceRequest>(),
+            new ArrayList<ContainerId>(), null);
+    AllocateResponse allocate = am0.allocate(req);
+    List<ContainerStatus> completedContainersStatuses =
+        allocate.getCompletedContainersStatuses();
+    Assert.assertEquals(noOfContainers, completedContainersStatuses.size());
+
+    // Application clean up should happen Cluster memory used is 0
+    nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    waitForClusterMemory(nm1, rs, 0);
+
+    rm1.stop();
+  }
+
+  private void waitForClusterMemory(MockNM nm1, ResourceScheduler rs,
+      int clusterMemory) throws Exception, InterruptedException {
+    int counter = 0;
+    while (rs.getRootQueueMetrics().getAllocatedMB() != clusterMemory) {
+      nm1.nodeHeartbeat(true);
+
+      Thread.sleep(100);
+      if (counter++ == 50) {
+        Assert.fail("Wait for cluster memory is timed out.Expected="
+            + clusterMemory + " Actual="
+            + rs.getRootQueueMetrics().getAllocatedMB());
+      }
+    }
+  }
+
+  public static List<NMContainerStatus> createNMContainerStatusForApp(MockAM am) {
+    List<NMContainerStatus> list = new ArrayList<NMContainerStatus>();
+    NMContainerStatus amContainer =
+        createNMContainerStatus(am.getApplicationAttemptId(), 1,
+            ContainerState.RUNNING, 1024);
+    NMContainerStatus completedContainer =
+        createNMContainerStatus(am.getApplicationAttemptId(), 2,
+            ContainerState.COMPLETE, 2048);
+    list.add(amContainer);
+    list.add(completedContainer);
+    return list;
+  }
+
+  public static NMContainerStatus createNMContainerStatus(
+      ApplicationAttemptId appAttemptId, int id, ContainerState containerState,
+      int memory) {
+    ContainerId containerId = ContainerId.newContainerId(appAttemptId, id);
+    NMContainerStatus containerReport =
+        NMContainerStatus.newInstance(containerId, containerState,
+            Resource.newInstance(memory, 1), "recover container", 0,
+            Priority.newInstance(0), 0);
+    return containerReport;
+  }
+
   public static void main(String[] args) throws Exception {
     TestApplicationCleanup t = new TestApplicationCleanup();
     t.testAppCleanup();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a64dd3d2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index d877e25..c6da3fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -540,7 +540,7 @@ public class TestRMNodeTransitions {
     int initialUnhealthy = cm.getUnhealthyNMs();
     int initialDecommissioned = cm.getNumDecommisionedNMs();
     int initialRebooted = cm.getNumRebootedNMs();
-    node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null));
+    node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null));
     Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
     Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
     Assert.assertEquals("Unhealthy Nodes",
@@ -614,7 +614,7 @@ public class TestRMNodeTransitions {
     Assert.assertEquals(nmVersion1, node.getNodeManagerVersion());
     RMNodeImpl reconnectingNode = getRunningNode(nmVersion2);
     node.handle(new RMNodeReconnectEvent(node.getNodeID(), reconnectingNode,
-        null));
+        null, null));
     Assert.assertEquals(nmVersion2, node.getNodeManagerVersion());
   }
 }