You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2011/04/29 10:35:56 UTC

svn commit: r1097727 [5/5] - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ yarn/yarn-common/src/main/ja...

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,298 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.io.IOException;
+
+import java.net.InetSocketAddress;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.mockito.ArgumentMatcher;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestContainerLocalizer {
+
+  static final Path basedir =
+      new Path("target", TestContainerLocalizer.class.getName());
+
+  @Test
+  @SuppressWarnings("unchecked") // mocked generics
+  public void testContainerLocalizerMain() throws Exception {
+    Configuration conf = new Configuration();
+    AbstractFileSystem spylfs =
+      spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+    // don't actually create dirs
+    doNothing().when(spylfs).mkdir(
+        isA(Path.class), isA(FsPermission.class), anyBoolean());
+    FileContext lfs = FileContext.getFileContext(spylfs, conf);
+    final String user = "yak";
+    final String appId = "app_RM_0";
+    final String cId = "container_0";
+    final InetSocketAddress nmAddr = new InetSocketAddress("foobar", 4344);
+    final Path logDir = lfs.makeQualified(new Path(basedir, "logs"));
+    final List<Path> localDirs = new ArrayList<Path>();
+    for (int i = 0; i < 4; ++i) {
+      localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+    }
+    RecordFactory mockRF = getMockLocalizerRecordFactory();
+    ContainerLocalizer concreteLoc = new ContainerLocalizer(lfs, user,
+        appId, cId, logDir, localDirs,
+        new HashMap<LocalResource,Future<Path>>(), mockRF);
+    ContainerLocalizer localizer = spy(concreteLoc);
+
+    // return credential stream instead of opening local file
+    final Random r = new Random();
+    long seed = r.nextLong();
+    r.setSeed(seed);
+    System.out.println("SEED: " + seed);
+    DataInputBuffer appTokens = createFakeCredentials(r, 10);
+    Path tokenPath =
+      lfs.makeQualified(new Path(
+            String.format(ContainerLocalizer.TOKEN_FILE_FMT, cId)));
+    doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
+        ).when(spylfs).open(tokenPath);
+
+    // mock heartbeat responses from NM
+    LocalizationProtocol nmProxy = mock(LocalizationProtocol.class);
+    LocalResource rsrcA = getMockRsrc(r, LocalResourceVisibility.PRIVATE);
+    LocalResource rsrcB = getMockRsrc(r, LocalResourceVisibility.PRIVATE);
+    LocalResource rsrcC = getMockRsrc(r, LocalResourceVisibility.APPLICATION);
+    LocalResource rsrcD = getMockRsrc(r, LocalResourceVisibility.PRIVATE);
+    when(nmProxy.heartbeat(isA(LocalizerStatus.class)))
+      .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
+            Collections.singletonList(rsrcA)))
+      .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
+            Collections.singletonList(rsrcB)))
+      .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
+            Collections.singletonList(rsrcC)))
+      .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
+            Collections.singletonList(rsrcD)))
+      .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
+            Collections.<LocalResource>emptyList()))
+      .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.DIE,
+            null));
+    doReturn(new FakeDownload(rsrcA.getResource().getFile(), true))
+      .when(localizer).download(isA(LocalDirAllocator.class), eq(rsrcA));
+    doReturn(new FakeDownload(rsrcB.getResource().getFile(), true))
+      .when(localizer).download(isA(LocalDirAllocator.class), eq(rsrcB));
+    doReturn(new FakeDownload(rsrcC.getResource().getFile(), true))
+      .when(localizer).download(isA(LocalDirAllocator.class), eq(rsrcC));
+    doReturn(new FakeDownload(rsrcD.getResource().getFile(), true))
+      .when(localizer).download(isA(LocalDirAllocator.class), eq(rsrcD));
+    doReturn(nmProxy).when(localizer).getProxy(nmAddr);
+    doNothing().when(localizer).sleep(anyInt());
+
+    // return result instantly for deterministic test
+    ExecutorService syncExec = mock(ExecutorService.class);
+    when(syncExec.submit(isA(Callable.class)))
+      .thenAnswer(new Answer<Future<Path>>() {
+          @Override
+          public Future<Path> answer(InvocationOnMock invoc)
+              throws Throwable {
+            Future<Path> done = mock(Future.class);
+            when(done.isDone()).thenReturn(true);
+            FakeDownload d = (FakeDownload) invoc.getArguments()[0];
+            when(done.get()).thenReturn(d.call());
+            return done;
+          }
+        });
+    doReturn(syncExec).when(localizer).createDownloadThreadPool();
+
+    // run localization
+    assertEquals(0, localizer.runLocalization(nmAddr));
+
+    // verify created cache, application dirs
+    for (Path p : localDirs) {
+      Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), user);
+      Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
+      // $x/usercache/$user/filecache
+      verify(spylfs).mkdir(eq(privcache), isA(FsPermission.class), eq(true));
+      Path appDir =
+        new Path(base, new Path(ContainerLocalizer.APPCACHE, appId));
+      // $x/usercache/$user/appcache/$appId/filecache
+      Path appcache = new Path(appDir, ContainerLocalizer.FILECACHE);
+      verify(spylfs).mkdir(eq(appcache), isA(FsPermission.class), eq(true));
+      // $x/usercache/$user/appcache/$appId/output
+      Path appOutput = new Path(appDir, ContainerLocalizer.OUTPUTDIR);
+      verify(spylfs).mkdir(eq(appOutput), isA(FsPermission.class), eq(true));
+    }
+
+    // verify tokens read at expected location
+    verify(spylfs).open(tokenPath);
+
+    // verify log dir creation
+    verify(spylfs).mkdir(eq(logDir), isA(FsPermission.class), anyBoolean());
+
+    // verify downloaded resources reported to NM
+    verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcA)));
+    verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcB)));
+    verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcC)));
+    verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcD)));
+
+    // verify all HB use localizerID provided
+    verify(nmProxy, never()).heartbeat(argThat(
+        new ArgumentMatcher<LocalizerStatus>() {
+          @Override
+          public boolean matches(Object o) {
+            LocalizerStatus status = (LocalizerStatus) o;
+            return !cId.equals(status.getLocalizerId());
+          }
+        }));
+  }
+
+  static class HBMatches extends ArgumentMatcher<LocalizerStatus> {
+    final LocalResource rsrc;
+    HBMatches(LocalResource rsrc) {
+      this.rsrc = rsrc;
+    }
+    @Override
+    public boolean matches(Object o) {
+      LocalizerStatus status = (LocalizerStatus) o;
+      for (LocalResourceStatus localized : status.getResources()) {
+        switch (localized.getStatus()) {
+        case FETCH_SUCCESS:
+          if (localized.getLocalPath().getFile().contains(
+                rsrc.getResource().getFile())) {
+            return true;
+          }
+          break;
+        default:
+          fail("Unexpected: " + localized.getStatus());
+          break;
+        }
+      }
+      return false;
+    }
+  }
+
+  static class FakeDownload implements Callable<Path> {
+    private final Path localPath;
+    private final boolean succeed;
+    FakeDownload(String absPath, boolean succeed) {
+      this.localPath = new Path("file:///localcache" + absPath);
+      this.succeed = succeed;
+    }
+    @Override
+    public Path call() throws IOException {
+      if (!succeed) {
+        throw new IOException("FAIL " + localPath);
+      }
+      return localPath;
+    }
+  }
+
+  static RecordFactory getMockLocalizerRecordFactory() {
+    RecordFactory mockRF = mock(RecordFactory.class);
+    when(mockRF.newRecordInstance(same(LocalResourceStatus.class)))
+      .thenAnswer(new Answer<LocalResourceStatus>() {
+          @Override
+          public LocalResourceStatus answer(InvocationOnMock invoc)
+              throws Throwable {
+            return new MockLocalResourceStatus();
+          }
+        });
+    when(mockRF.newRecordInstance(same(LocalizerStatus.class)))
+      .thenAnswer(new Answer<LocalizerStatus>() {
+          @Override
+          public LocalizerStatus answer(InvocationOnMock invoc)
+              throws Throwable {
+            return new MockLocalizerStatus();
+          }
+        });
+    return mockRF;
+  }
+
+  static LocalResource getMockRsrc(Random r,
+      LocalResourceVisibility vis) {
+    LocalResource rsrc = mock(LocalResource.class);
+
+    String name = Long.toHexString(r.nextLong());
+    URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class);
+    when(uri.getScheme()).thenReturn("file");
+    when(uri.getHost()).thenReturn(null);
+    when(uri.getFile()).thenReturn("/local/" + vis + "/" + name);
+
+    when(rsrc.getResource()).thenReturn(uri);
+    when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L);
+    when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L);
+    when(rsrc.getType()).thenReturn(LocalResourceType.FILE);
+    when(rsrc.getVisibility()).thenReturn(vis);
+
+    return rsrc;
+  }
+
+  static DataInputBuffer createFakeCredentials(Random r, int nTok)
+      throws IOException {
+    Credentials creds = new Credentials();
+    byte[] password = new byte[20];
+    Text kind = new Text();
+    Text service = new Text();
+    Text alias = new Text();
+    for (int i = 0; i < nTok; ++i) {
+      byte[] identifier = ("idef" + i).getBytes();
+      r.nextBytes(password);
+      kind.set("kind" + i);
+      service.set("service" + i);
+      alias.set("token" + i);
+      Token token = new Token(identifier, password, kind, service);
+      creds.addToken(alias, token);
+    }
+    DataOutputBuffer buf = new DataOutputBuffer();
+    creds.writeTokenStorageToStream(buf);
+    DataInputBuffer ret = new DataInputBuffer();
+    ret.reset(buf.getData(), 0, buf.getLength());
+    return ret;
+  }
+
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java Fri Apr 29 08:35:53 2011
@@ -20,15 +20,11 @@ package org.apache.hadoop.yarn.server.no
 
 import java.io.IOException;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -38,7 +34,9 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FSDownload;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -58,8 +56,11 @@ public class TestFSDownload {
     fs.delete(new Path("target", TestFSDownload.class.getSimpleName()), true);
   }
 
-  static org.apache.hadoop.yarn.api.records.LocalResource createFile(FileContext files, Path p, int len, Random r)
-      throws IOException, URISyntaxException {
+  static final RecordFactory recordFactory =
+    RecordFactoryProvider.getRecordFactory(null);
+
+  static LocalResource createFile(FileContext files, Path p, int len,
+      Random r) throws IOException, URISyntaxException {
     FSDataOutputStream out = null;
     try {
       byte[] bytes = new byte[len];
@@ -69,7 +70,7 @@ public class TestFSDownload {
     } finally {
       if (out != null) out.close();
     }
-    org.apache.hadoop.yarn.api.records.LocalResource ret = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(org.apache.hadoop.yarn.api.records.LocalResource.class);
+    LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
     ret.setResource(ConverterUtils.getYarnUrlFromPath(p));
     ret.setSize(len);
     ret.setType(LocalResourceType.FILE);
@@ -87,39 +88,32 @@ public class TestFSDownload {
     files.mkdir(basedir, null, true);
     conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
 
-    Collection<FSDownload> pending = new ArrayList<FSDownload>();
     Random rand = new Random();
     long sharedSeed = rand.nextLong();
     rand.setSeed(sharedSeed);
     System.out.println("SEED: " + sharedSeed);
+
+    Map<LocalResource,Future<Path>> pending =
+      new HashMap<LocalResource,Future<Path>>();
+    ExecutorService exec = Executors.newSingleThreadExecutor();
     LocalDirAllocator dirs =
       new LocalDirAllocator(TestFSDownload.class.getName());
     int[] sizes = new int[10];
     for (int i = 0; i < 10; ++i) {
       sizes[i] = rand.nextInt(512) + 512;
-      org.apache.hadoop.yarn.api.records.LocalResource rsrc = createFile(files, new Path(basedir, "" + i),
+      LocalResource rsrc = createFile(files, new Path(basedir, "" + i),
           sizes[i], rand);
       FSDownload fsd =
         new FSDownload(files, conf, dirs, rsrc, new Random(sharedSeed));
-      pending.add(fsd);
+      pending.put(rsrc, exec.submit(fsd));
     }
 
-    ExecutorService exec = Executors.newSingleThreadExecutor();
-    CompletionService<Map<org.apache.hadoop.yarn.api.records.LocalResource,Path>> queue =
-      new ExecutorCompletionService<Map<org.apache.hadoop.yarn.api.records.LocalResource,Path>>(exec);
     try {
-      for (FSDownload fsd : pending) {
-        queue.submit(fsd);
-      }
-      Map<org.apache.hadoop.yarn.api.records.LocalResource,Path> results = new HashMap();
-      for (int i = 0; i < 10; ++i) {
-        Future<Map<org.apache.hadoop.yarn.api.records.LocalResource,Path>> result = queue.take();
-        results.putAll(result.get());
-      }
-      for (Map.Entry<org.apache.hadoop.yarn.api.records.LocalResource,Path> localized : results.entrySet()) {
+      for (Map.Entry<LocalResource,Future<Path>> p : pending.entrySet()) {
+        Path localized = p.getValue().get();
         assertEquals(
-            sizes[Integer.valueOf(localized.getValue().getName())],
-            localized.getKey().getSize() - 4096 - 16); // bad DU impl + .crc ; sigh
+          sizes[Integer.valueOf(localized.getName())],
+          p.getKey().getSize() - 4096 - 16); // bad DU impl + .crc ; sigh
       }
     } catch (ExecutionException e) {
       throw new IOException("Failed exec", e);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResource.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResource.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResource.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResource.java Fri Apr 29 08:35:53 2011
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResource;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 import static org.apache.hadoop.yarn.api.records.LocalResourceType.*;
@@ -48,14 +48,14 @@ public class TestLocalResource {
     return ret;
   }
 
-  static void checkEqual(LocalResource a, LocalResource b) {
+  static void checkEqual(LocalResourceRequest a, LocalResourceRequest b) {
     assertEquals(a, b);
     assertEquals(a.hashCode(), b.hashCode());
     assertEquals(0, a.compareTo(b));
     assertEquals(0, b.compareTo(a));
   }
 
-  static void checkNotEqual(LocalResource a, LocalResource b) {
+  static void checkNotEqual(LocalResourceRequest a, LocalResourceRequest b) {
     assertFalse(a.equals(b));
     assertFalse(b.equals(a));
     assertFalse(a.hashCode() == b.hashCode());
@@ -75,40 +75,40 @@ public class TestLocalResource {
         new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC);
     org.apache.hadoop.yarn.api.records.LocalResource yB = getYarnResource(
         new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC);
-    final LocalResource a = new LocalResource(yA);
-    LocalResource b = new LocalResource(yA);
+    final LocalResourceRequest a = new LocalResourceRequest(yA);
+    LocalResourceRequest b = new LocalResourceRequest(yA);
     checkEqual(a, b);
-    b = new LocalResource(yB);
+    b = new LocalResourceRequest(yB);
     checkEqual(a, b);
 
     // ignore visibility
     yB = getYarnResource(
         new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PRIVATE);
-    b = new LocalResource(yB);
+    b = new LocalResourceRequest(yB);
     checkEqual(a, b);
 
     // ignore size
     yB = getYarnResource(
         new Path("http://yak.org:80/foobar"), 0, basetime, FILE, PRIVATE);
-    b = new LocalResource(yB);
+    b = new LocalResourceRequest(yB);
     checkEqual(a, b);
 
     // note path
     yB = getYarnResource(
         new Path("hdfs://dingo.org:80/foobar"), 0, basetime, ARCHIVE, PUBLIC);
-    b = new LocalResource(yB);
+    b = new LocalResourceRequest(yB);
     checkNotEqual(a, b);
 
     // note type
     yB = getYarnResource(
         new Path("http://yak.org:80/foobar"), 0, basetime, ARCHIVE, PUBLIC);
-    b = new LocalResource(yB);
+    b = new LocalResourceRequest(yB);
     checkNotEqual(a, b);
 
     // note timestamp
     yB = getYarnResource(
         new Path("http://yak.org:80/foobar"), 0, basetime + 1, FILE, PUBLIC);
-    b = new LocalResource(yB);
+    b = new LocalResourceRequest(yB);
     checkNotEqual(a, b);
   }
 
@@ -121,24 +121,24 @@ public class TestLocalResource {
     long basetime = r.nextLong() >>> 2;
     org.apache.hadoop.yarn.api.records.LocalResource yA = getYarnResource(
         new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC);
-    final LocalResource a = new LocalResource(yA);
+    final LocalResourceRequest a = new LocalResourceRequest(yA);
 
     // Path primary
     org.apache.hadoop.yarn.api.records.LocalResource yB = getYarnResource(
         new Path("http://yak.org:80/foobaz"), -1, basetime, FILE, PUBLIC);
-    LocalResource b = new LocalResource(yB);
+    LocalResourceRequest b = new LocalResourceRequest(yB);
     assertTrue(0 > a.compareTo(b));
 
     // timestamp secondary
     yB = getYarnResource(
         new Path("http://yak.org:80/foobar"), -1, basetime + 1, FILE, PUBLIC);
-    b = new LocalResource(yB);
+    b = new LocalResourceRequest(yB);
     assertTrue(0 > a.compareTo(b));
 
     // type tertiary
     yB = getYarnResource(
         new Path("http://yak.org:80/foobar"), -1, basetime, ARCHIVE, PUBLIC);
-    b = new LocalResource(yB);
+    b = new LocalResourceRequest(yB);
     assertTrue(0 != a.compareTo(b)); // don't care about order, just ne
   }
 

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,244 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.mockito.ArgumentMatcher;
+import static org.mockito.Mockito.*;
+
+public class TestLocalizedResource {
+
+  static ContainerId getMockContainer(int id) {
+    ApplicationId appId = mock(ApplicationId.class);
+    when(appId.getClusterTimestamp()).thenReturn(314159265L);
+    when(appId.getId()).thenReturn(3);
+    ContainerId container = mock(ContainerId.class);
+    when(container.getId()).thenReturn(id);
+    when(container.getAppId()).thenReturn(appId);
+    return container;
+  }
+
+  @Test
+  @SuppressWarnings("unchecked") // mocked generic
+  public void testNotification() throws Exception {
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    dispatcher.init(null);
+    try {
+      dispatcher.start();
+      EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+      EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
+      dispatcher.register(ContainerEventType.class, containerBus);
+      dispatcher.register(LocalizerEventType.class, localizerBus);
+
+      // mock resource
+      LocalResource apiRsrc = createMockResource();
+
+      final ContainerId container0 = getMockContainer(0);
+      final Credentials creds0 = new Credentials();
+      final LocalResourceVisibility vis0 = LocalResourceVisibility.PRIVATE;
+      final LocalizerContext ctxt0 =
+        new LocalizerContext("yak", container0, creds0);
+      LocalResourceRequest rsrcA = new LocalResourceRequest(apiRsrc);
+      LocalizedResource local = new LocalizedResource(rsrcA, dispatcher);
+      local.handle(new ResourceRequestEvent(rsrcA, vis0, ctxt0));
+      dispatcher.await();
+
+      // Register C0, verify request event
+      LocalizerEventMatcher matchesL0Req =
+        new LocalizerEventMatcher(container0, creds0, vis0,
+            LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION);
+      verify(localizerBus).handle(argThat(matchesL0Req));
+      assertEquals(ResourceState.DOWNLOADING, local.getState());
+
+      // Register C1, verify request event
+      final Credentials creds1 = new Credentials();
+      final ContainerId container1 = getMockContainer(1);
+      final LocalizerContext ctxt1 =
+        new LocalizerContext("yak", container1, creds1);
+      final LocalResourceVisibility vis1 = LocalResourceVisibility.PUBLIC;
+      local.handle(new ResourceRequestEvent(rsrcA, vis1, ctxt1));
+      dispatcher.await();
+      LocalizerEventMatcher matchesL1Req =
+        new LocalizerEventMatcher(container1, creds1, vis1,
+            LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION);
+      verify(localizerBus).handle(argThat(matchesL1Req));
+
+      // Release C0 container localization, verify no notification
+      local.handle(new ResourceReleaseEvent(rsrcA, container0));
+      dispatcher.await();
+      verify(containerBus, never()).handle(isA(ContainerEvent.class));
+      assertEquals(ResourceState.DOWNLOADING, local.getState());
+
+      // Release C1 container localization, verify no notification
+      local.handle(new ResourceReleaseEvent(rsrcA, container1));
+      dispatcher.await();
+      verify(containerBus, never()).handle(isA(ContainerEvent.class));
+      assertEquals(ResourceState.INIT, local.getState());
+
+      // Register C2, C3
+      final ContainerId container2 = getMockContainer(2);
+      final LocalResourceVisibility vis2 = LocalResourceVisibility.PRIVATE;
+      final Credentials creds2 = new Credentials();
+      final LocalizerContext ctxt2 =
+        new LocalizerContext("yak", container2, creds2);
+
+      final ContainerId container3 = getMockContainer(3);
+      final LocalResourceVisibility vis3 = LocalResourceVisibility.PRIVATE;
+      final Credentials creds3 = new Credentials();
+      final LocalizerContext ctxt3 =
+        new LocalizerContext("yak", container3, creds3);
+
+      local.handle(new ResourceRequestEvent(rsrcA, vis2, ctxt2));
+      local.handle(new ResourceRequestEvent(rsrcA, vis3, ctxt3));
+      dispatcher.await();
+      LocalizerEventMatcher matchesL2Req =
+        new LocalizerEventMatcher(container2, creds2, vis2,
+            LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION);
+      verify(localizerBus).handle(argThat(matchesL2Req));
+      LocalizerEventMatcher matchesL3Req =
+        new LocalizerEventMatcher(container3, creds3, vis3,
+            LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION);
+      verify(localizerBus).handle(argThat(matchesL3Req));
+
+      // Successful localization. verify notification C2, C3
+      Path locA = new Path("file:///cache/rsrcA");
+      local.handle(new ResourceLocalizedEvent(rsrcA, locA, 10));
+      dispatcher.await();
+      ContainerEventMatcher matchesC2Localized =
+        new ContainerEventMatcher(container2,
+            ContainerEventType.RESOURCE_LOCALIZED);
+      ContainerEventMatcher matchesC3Localized =
+        new ContainerEventMatcher(container3,
+            ContainerEventType.RESOURCE_LOCALIZED);
+      verify(containerBus).handle(argThat(matchesC2Localized));
+      verify(containerBus).handle(argThat(matchesC3Localized));
+      assertEquals(ResourceState.LOCALIZED, local.getState());
+
+      // Register C4, verify notification
+      final ContainerId container4 = getMockContainer(4);
+      final Credentials creds4 = new Credentials();
+      final LocalizerContext ctxt4 =
+        new LocalizerContext("yak", container4, creds4);
+      final LocalResourceVisibility vis4 = LocalResourceVisibility.PRIVATE;
+      local.handle(new ResourceRequestEvent(rsrcA, vis4, ctxt4));
+      dispatcher.await();
+      ContainerEventMatcher matchesC4Localized =
+        new ContainerEventMatcher(container4,
+            ContainerEventType.RESOURCE_LOCALIZED);
+      verify(containerBus).handle(argThat(matchesC4Localized));
+      assertEquals(ResourceState.LOCALIZED, local.getState());
+    } finally {
+      dispatcher.stop();
+    }
+  }
+
+  @Test
+  public void testDirectLocalization() throws Exception {
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    dispatcher.init(null);
+    try {
+      dispatcher.start();
+      LocalResource apiRsrc = createMockResource();
+      LocalResourceRequest rsrcA = new LocalResourceRequest(apiRsrc);
+      LocalizedResource local = new LocalizedResource(rsrcA, dispatcher);
+      Path p = new Path("file:///cache/rsrcA");
+      local.handle(new ResourceLocalizedEvent(rsrcA, p, 10));
+      dispatcher.await();
+      assertEquals(ResourceState.LOCALIZED, local.getState());
+    } finally {
+      dispatcher.stop();
+    }
+  }
+
+  static LocalResource createMockResource() {
+    // mock rsrc location
+    org.apache.hadoop.yarn.api.records.URL uriA =
+      mock(org.apache.hadoop.yarn.api.records.URL.class);
+    when(uriA.getScheme()).thenReturn("file");
+    when(uriA.getHost()).thenReturn(null);
+    when(uriA.getFile()).thenReturn("/localA/rsrc");
+
+    LocalResource apiRsrc = mock(LocalResource.class);
+    when(apiRsrc.getResource()).thenReturn(uriA);
+    when(apiRsrc.getTimestamp()).thenReturn(4344L);
+    when(apiRsrc.getType()).thenReturn(LocalResourceType.FILE);
+    return apiRsrc;
+  }
+
+
+  static class LocalizerEventMatcher extends ArgumentMatcher<LocalizerEvent> {
+    Credentials creds;
+    LocalResourceVisibility vis;
+    private final ContainerId idRef;
+    private final LocalizerEventType type;
+
+    public LocalizerEventMatcher(ContainerId idRef, Credentials creds,
+        LocalResourceVisibility vis, LocalizerEventType type) {
+      this.vis = vis;
+      this.type = type;
+      this.creds = creds;
+      this.idRef = idRef;
+    }
+    @Override
+    public boolean matches(Object o) {
+      if (!(o instanceof LocalizerResourceRequestEvent)) return false;
+      LocalizerResourceRequestEvent evt = (LocalizerResourceRequestEvent) o;
+      return idRef == evt.getContext().getContainer()
+          && type == evt.getType()
+          && vis == evt.getVisibility()
+          && creds == evt.getContext().getCredentials();
+    }
+  }
+
+  static class ContainerEventMatcher extends ArgumentMatcher<ContainerEvent> {
+    private final ContainerId idRef;
+    private final ContainerEventType type;
+    public ContainerEventMatcher(ContainerId idRef, ContainerEventType type) {
+      this.idRef = idRef;
+      this.type = type;
+    }
+    @Override
+    public boolean matches(Object o) {
+      if (!(o instanceof ContainerEvent)) return false;
+      ContainerEvent evt = (ContainerEvent) o;
+      return idRef == evt.getContainerID() && type == evt.getType();
+    }
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,80 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import static org.mockito.Mockito.*;
+
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCAL_DIR;
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOCAL_DIR;
+
+public class TestResourceLocalizationService {
+
+  static final Path basedir =
+      new Path("target", TestResourceLocalizationService.class.getName());
+
+  @Test
+  public void testLocalizationInit() throws Exception {
+    final Configuration conf = new Configuration();
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(null);
+
+    ContainerExecutor exec = mock(ContainerExecutor.class);
+    DeletionService delService = spy(new DeletionService(exec));
+    delService.init(null);
+    delService.start();
+
+    AbstractFileSystem spylfs =
+      spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+    FileContext lfs = FileContext.getFileContext(spylfs, conf);
+    doNothing().when(spylfs).mkdir(
+        isA(Path.class), isA(FsPermission.class), anyBoolean());
+
+    ResourceLocalizationService locService =
+      spy(new ResourceLocalizationService(dispatcher, exec, delService));
+    doReturn(lfs)
+      .when(locService).getLocalFileContext(isA(Configuration.class));
+    try {
+      dispatcher.start();
+      List<Path> localDirs = new ArrayList<Path>();
+      String[] sDirs = new String[4];
+      for (int i = 0; i < 4; ++i) {
+        localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+        sDirs[i] = localDirs.get(i).toString();
+      }
+      conf.setStrings(NM_LOCAL_DIR, sDirs);
+
+      // initialize ResourceLocalizationService
+      locService.init(conf);
+
+      // verify directory creation
+      for (Path p : localDirs) {
+        Path usercache = new Path(p, ContainerLocalizer.USERCACHE);
+        verify(spylfs)
+          .mkdir(eq(usercache), isA(FsPermission.class), eq(true));
+        Path publicCache = new Path(p, ContainerLocalizer.FILECACHE);
+        verify(spylfs)
+          .mkdir(eq(publicCache), isA(FsPermission.class), eq(true));
+        Path nmPriv = new Path(p, ResourceLocalizationService.NM_PRIVATE_DIR);
+        verify(spylfs).mkdir(eq(nmPriv),
+            eq(ResourceLocalizationService.NM_PRIVATE_PERM), eq(true));
+      }
+    } finally {
+      dispatcher.stop();
+      delService.stop();
+    }
+  }
+
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java Fri Apr 29 08:35:53 2011
@@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@@ -48,6 +47,7 @@ import org.apache.hadoop.yarn.util.Build
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.Before;
 import org.junit.Test;
+import static org.mockito.Mockito.*;
 
 public class TestNMWebServer {
 
@@ -84,16 +84,12 @@ public class TestNMWebServer {
     Dispatcher dispatcher = new AsyncDispatcher();
     String user = "nobody";
     long clusterTimeStamp = 1234;
-    Map<String, String> env = new HashMap<String, String>();
-    Map<String, LocalResource> resources =
-        new HashMap<String, LocalResource>();
-    ByteBuffer containerTokens = ByteBuffer.allocate(0);
     ApplicationId appId =
         BuilderUtils.newApplicationId(recordFactory, clusterTimeStamp, 1);
-      Application app =
-          new ApplicationImpl(dispatcher, user, appId, env, resources,
-              containerTokens);
-      nmContext.getApplications().put(appId, app);
+    Application app = mock(Application.class);
+    when(app.getUser()).thenReturn(user);
+    when(app.getAppId()).thenReturn(appId);
+    nmContext.getApplications().put(appId, app);
     ContainerId container1 =
         BuilderUtils.newContainerId(recordFactory, appId, 0);
     ContainerId container2 =

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java Fri Apr 29 08:35:53 2011
@@ -24,15 +24,16 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.NodeHealthCheckerService;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.service.AbstractService;
@@ -156,12 +157,8 @@ public class MiniYARNCluster extends Com
           };
 
           @Override
-          protected
-              org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater
-              createNodeStatusUpdater(
-                  org.apache.hadoop.yarn.server.nodemanager.Context context,
-                  Dispatcher dispatcher,
-                  NodeHealthCheckerService healthChecker) {
+          protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+              Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
             return new NodeStatusUpdaterImpl(context, dispatcher,
                 healthChecker) {
               @Override