You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by bg...@apache.org on 2014/12/02 23:07:46 UTC

incubator-reef git commit: [REEF-53]: Currently there is no interface for NameServer. And all the parameters of the NameServer have default values. This means as long as a client puts NameServer in the Driver constructor, no matter it is binded or not, i

Repository: incubator-reef
Updated Branches:
  refs/heads/master 37c2e5c17 -> 7492d26b5


[REEF-53]: Currently there is no interface for NameServer. And all the parameters of
the NameServer have default values. This means as long as a client puts NameServer in the
Driver constructor, no matter it is binded or not, it will be started.

This introduces NameServer interface and renames original NameServer as NameServerImpl,
so that if NameServer is not in a Driver constructor and clients do not bind NameServerImpl
into Driver configuration, NameServer will be not started.

- Introduce NameServer interface and add default Name Server implementation
- remove DefaultNameServerImpl

Author: Julia Wang jwang98052@yahoo.com

JIRA:
  [REEF-53]: https://issues.apache.org/jira/browse/REEF-53

Pull Request:
  Closes #27


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/7492d26b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/7492d26b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/7492d26b

Branch: refs/heads/master
Commit: 7492d26b5f295fcba4a746b353318ce234aea0cc
Parents: 37c2e5c
Author: Julia Wang <jw...@yahoo.com>
Authored: Tue Nov 25 16:18:38 2014 -0800
Committer: Byung-Gon Chun <bg...@apache.org>
Committed: Wed Dec 3 07:00:02 2014 +0900

----------------------------------------------------------------------
 .../reef/io/network/naming/NameServer.java      | 233 +-------------
 .../network/naming/NameServerConfiguration.java |   2 +
 .../reef/io/network/naming/NameServerImpl.java  | 300 +++++++++++++++++++
 .../reef/services/network/NameClientTest.java   |   9 +-
 .../reef/services/network/NamingTest.java       |   8 +-
 .../services/network/NetworkServiceTest.java    |  15 +-
 6 files changed, 331 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7492d26b/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java b/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java
index 4c1a8ac..5a4c765 100644
--- a/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java
+++ b/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java
@@ -16,10 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.reef.io.network.naming;
 
 import org.apache.reef.io.naming.NameAssignment;
 import org.apache.reef.io.network.naming.serialization.*;
+import org.apache.reef.tang.annotations.DefaultImplementation;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.Identifier;
@@ -43,101 +45,15 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 /**
- * Naming server
+ * Naming server interface
  */
-public class NameServer implements Stage {
-
-  private static final Logger LOG = Logger.getLogger(NameServer.class.getName());
-
-  private final Transport transport;
-  private final Map<Identifier, InetSocketAddress> idToAddrMap;
-  private final ReefEventStateManager reefEventStateManager;
-  private final int port;
-
-  /**
-   * @param port    a listening port number
-   * @param factory an identifier factory
-   * @deprecated inject the NameServer instead of new it up
-   * Constructs a name server
-   */
-  // TODO: All existing NameServer usage is currently new-up, need to make them injected as well.
-  @Deprecated
-  public NameServer(
-      final int port,
-      final IdentifierFactory factory) {
-
-    this.reefEventStateManager = null;
-    final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
-    final EventHandler<NamingMessage> handler = createEventHandler(codec);
-
-    this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), port, null,
-        new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000);
-
-    this.port = transport.getListeningPort();
-    this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>());
-
-    LOG.log(Level.FINE, "NameServer starting, listening at port {0}", this.port);
-  }
-
-
-  /**
-   * Constructs a name server
-   *
-   * @param port                  a listening port number
-   * @param factory               an identifier factory
-   * @param reefEventStateManager the event state manager used to register name server info
-   */
-  @Inject
-  public NameServer(
-      final @Parameter(NameServerParameters.NameServerPort.class) int port,
-      final @Parameter(NameServerParameters.NameServerIdentifierFactory.class) IdentifierFactory factory,
-      final ReefEventStateManager reefEventStateManager) {
-
-    this.reefEventStateManager = reefEventStateManager;
-    final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
-    final EventHandler<NamingMessage> handler = createEventHandler(codec);
-
-    this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), port, null,
-        new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000);
-
-    this.port = transport.getListeningPort();
-    this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>());
-
-    this.reefEventStateManager.registerServiceInfo(
-        AvroReefServiceInfo.newBuilder()
-            .setServiceName("NameServer")
-            .setServiceInfo(getNameServerId())
-            .build());
-    LOG.log(Level.FINE, "NameServer starting, listening at port {0}", this.port);
-  }
-
-  private EventHandler<NamingMessage> createEventHandler(final Codec<NamingMessage> codec) {
-
-    final Map<Class<? extends NamingMessage>, EventHandler<? extends NamingMessage>>
-        clazzToHandlerMap = new HashMap<>();
-
-    clazzToHandlerMap.put(NamingLookupRequest.class, new NamingLookupRequestHandler(this, codec));
-    clazzToHandlerMap.put(NamingRegisterRequest.class, new NamingRegisterRequestHandler(this, codec));
-    clazzToHandlerMap.put(NamingUnregisterRequest.class, new NamingUnregisterRequestHandler(this));
-    final EventHandler<NamingMessage> handler = new MultiEventHandler<>(clazzToHandlerMap);
-
-    return handler;
-  }
+public interface NameServer extends Stage {
 
   /**
-   * Gets port
+   * get port number
+   * @return
    */
-  public int getPort() {
-    return port;
-  }
-
-  /**
-   * Closes resources
-   */
-  @Override
-  public void close() throws Exception {
-    transport.close();
-  }
+  public int getPort();
 
   /**
    * Registers an (identifier, address) mapping locally
@@ -145,20 +61,14 @@ public class NameServer implements Stage {
    * @param id   an identifier
    * @param addr an Internet socket address
    */
-  public void register(final Identifier id, final InetSocketAddress addr) {
-    LOG.log(Level.FINE, "id: " + id + " addr: " + addr);
-    idToAddrMap.put(id, addr);
-  }
+  public void register(final Identifier id, final InetSocketAddress addr);
 
   /**
    * Unregisters an identifier locally
    *
    * @param id an identifier
    */
-  public void unregister(final Identifier id) {
-    LOG.log(Level.FINE, "id: " + id);
-    idToAddrMap.remove(id);
-  }
+  public void unregister(final Identifier id);
 
   /**
    * Finds an address for an identifier locally
@@ -166,130 +76,13 @@ public class NameServer implements Stage {
    * @param id an identifier
    * @return an Internet socket address
    */
-  public InetSocketAddress lookup(final Identifier id) {
-    LOG.log(Level.FINE, "id: {0}", id);
-    return idToAddrMap.get(id);
-  }
+  public InetSocketAddress lookup(final Identifier id);
 
   /**
    * Finds addresses for identifiers locally
    *
-   * @param identifiers an iterable of identifiers
+   * @param identifiers an Iterable of identifiers
    * @return a list of name assignments
    */
-  public List<NameAssignment> lookup(final Iterable<Identifier> identifiers) {
-    LOG.log(Level.FINE, "identifiers");
-    final List<NameAssignment> nas = new ArrayList<>();
-    for (final Identifier id : identifiers) {
-      final InetSocketAddress addr = idToAddrMap.get(id);
-      LOG.log(Level.FINEST, "id : {0} addr: {1}", new Object[]{id, addr});
-      if (addr != null) {
-        nas.add(new NameAssignmentTuple(id, addr));
-      }
-    }
-    return nas;
-  }
-
-  private String getNameServerId() {
-    return NetUtils.getLocalAddress() + ":" + getPort();
-  }
-}
-
-/**
- * Naming server transport event handler that invokes a specific naming message handler
- */
-class NamingServerHandler implements EventHandler<TransportEvent> {
-
-  private final Codec<NamingMessage> codec;
-  private final EventHandler<NamingMessage> handler;
-
-  NamingServerHandler(final EventHandler<NamingMessage> handler, final Codec<NamingMessage> codec) {
-    this.codec = codec;
-    this.handler = handler;
-  }
-
-  @Override
-  public void onNext(final TransportEvent value) {
-    final byte[] data = value.getData();
-    final NamingMessage message = codec.decode(data);
-    message.setLink(value.getLink());
-    handler.onNext(message);
-  }
-}
-
-/**
- * Naming lookup request handler
- */
-class NamingLookupRequestHandler implements EventHandler<NamingLookupRequest> {
-
-  private static final Logger LOG = Logger.getLogger(NamingLookupRequestHandler.class.getName());
-
-
-  private final NameServer server;
-  private final Codec<NamingMessage> codec;
-
-  public NamingLookupRequestHandler(final NameServer server, final Codec<NamingMessage> codec) {
-    this.server = server;
-    this.codec = codec;
-  }
-
-  @Override
-  public void onNext(final NamingLookupRequest value) {
-    final List<NameAssignment> nas = server.lookup(value.getIdentifiers());
-    final byte[] resp = codec.encode(new NamingLookupResponse(nas));
-    try {
-      value.getLink().write(resp);
-    } catch (final IOException e) {
-      //Actually, there is no way Link.write can throw and IOException
-      //after netty4 merge. This needs to cleaned up
-      LOG.throwing("NamingLookupRequestHandler", "onNext", e);
-    }
-  }
-}
-
-/**
- * Naming register request handler
- */
-class NamingRegisterRequestHandler implements EventHandler<NamingRegisterRequest> {
-
-  private static final Logger LOG = Logger.getLogger(NamingRegisterRequestHandler.class.getName());
-
-
-  private final NameServer server;
-  private final Codec<NamingMessage> codec;
-
-  public NamingRegisterRequestHandler(final NameServer server, final Codec<NamingMessage> codec) {
-    this.server = server;
-    this.codec = codec;
-  }
-
-  @Override
-  public void onNext(final NamingRegisterRequest value) {
-    server.register(value.getNameAssignment().getIdentifier(), value.getNameAssignment().getAddress());
-    final byte[] resp = codec.encode(new NamingRegisterResponse(value));
-    try {
-      value.getLink().write(resp);
-    } catch (final IOException e) {
-      //Actually, there is no way Link.write can throw and IOException
-      //after netty4 merge. This needs to cleaned up
-      LOG.throwing("NamingRegisterRequestHandler", "onNext", e);
-    }
-  }
-}
-
-/**
- * Naming unregister request handler
- */
-class NamingUnregisterRequestHandler implements EventHandler<NamingUnregisterRequest> {
-
-  private final NameServer server;
-
-  public NamingUnregisterRequestHandler(final NameServer server) {
-    this.server = server;
-  }
-
-  @Override
-  public void onNext(final NamingUnregisterRequest value) {
-    server.unregister(value.getIdentifier());
-  }
-}
+  public List<NameAssignment> lookup(final Iterable<Identifier> identifiers);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7492d26b/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerConfiguration.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerConfiguration.java b/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerConfiguration.java
index f3b7eec..1912686 100644
--- a/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerConfiguration.java
+++ b/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerConfiguration.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.reef.io.network.naming;
 
 import org.apache.reef.tang.formats.ConfigurationModule;
@@ -45,5 +46,6 @@ public final class NameServerConfiguration extends ConfigurationModuleBuilder {
       .bindNamedParameter(NameServerParameters.NameServerPort.class, NAME_SERVICE_PORT)
       .bindNamedParameter(NameServerParameters.NameServerAddr.class, NAME_SERVER_HOSTNAME)
       .bindNamedParameter(NameServerParameters.NameServerIdentifierFactory.class, NAME_SERVER_IDENTIFIER_FACTORY)
+      .bindImplementation(NameServer.class, NameServerImpl.class)
       .build();
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7492d26b/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java b/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
new file mode 100644
index 0000000..306682a
--- /dev/null
+++ b/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.naming.NameAssignment;
+import org.apache.reef.io.network.naming.serialization.*;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.Stage;
+import org.apache.reef.wake.impl.MultiEventHandler;
+import org.apache.reef.wake.impl.SyncStage;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.NetUtils;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+import org.apache.reef.webserver.AvroReefServiceInfo;
+import org.apache.reef.webserver.ReefEventStateManager;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Naming server implementation
+ */
+public class NameServerImpl implements NameServer {
+
+  private static final Logger LOG = Logger.getLogger(NameServer.class.getName());
+
+  private final Transport transport;
+  private final Map<Identifier, InetSocketAddress> idToAddrMap;
+  private final ReefEventStateManager reefEventStateManager;
+  private final int port;
+
+  /**
+   * @param port    a listening port number
+   * @param factory an identifier factory
+   * @deprecated inject the NameServer instead of new it up
+   * Constructs a name server
+   */
+  // TODO: All existing NameServer usage is currently new-up, need to make them injected as well.
+  @Deprecated
+  public NameServerImpl(
+      final int port,
+      final IdentifierFactory factory) {
+
+    this.reefEventStateManager = null;
+    final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
+    final EventHandler<NamingMessage> handler = createEventHandler(codec);
+
+    this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), port, null,
+        new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000);
+
+    this.port = transport.getListeningPort();
+    this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>());
+
+    LOG.log(Level.FINE, "NameServer starting, listening at port {0}", this.port);
+  }
+
+
+  /**
+   * Constructs a name server
+   *
+   * @param port                  a listening port number
+   * @param factory               an identifier factory
+   * @param reefEventStateManager the event state manager used to register name server info
+   */
+  @Inject
+  public NameServerImpl(
+      final @Parameter(NameServerParameters.NameServerPort.class) int port,
+      final @Parameter(NameServerParameters.NameServerIdentifierFactory.class) IdentifierFactory factory,
+      final ReefEventStateManager reefEventStateManager) {
+
+    this.reefEventStateManager = reefEventStateManager;
+    final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
+    final EventHandler<NamingMessage> handler = createEventHandler(codec);
+
+    this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), port, null,
+        new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000);
+
+    this.port = transport.getListeningPort();
+    this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>());
+
+    this.reefEventStateManager.registerServiceInfo(
+        AvroReefServiceInfo.newBuilder()
+            .setServiceName("NameServer")
+            .setServiceInfo(getNameServerId())
+            .build());
+    LOG.log(Level.FINE, "NameServer starting, listening at port {0}", this.port);
+  }
+
+  private EventHandler<NamingMessage> createEventHandler(final Codec<NamingMessage> codec) {
+
+    final Map<Class<? extends NamingMessage>, EventHandler<? extends NamingMessage>>
+        clazzToHandlerMap = new HashMap<>();
+
+    clazzToHandlerMap.put(NamingLookupRequest.class, new NamingLookupRequestHandler(this, codec));
+    clazzToHandlerMap.put(NamingRegisterRequest.class, new NamingRegisterRequestHandler(this, codec));
+    clazzToHandlerMap.put(NamingUnregisterRequest.class, new NamingUnregisterRequestHandler(this));
+    final EventHandler<NamingMessage> handler = new MultiEventHandler<>(clazzToHandlerMap);
+
+    return handler;
+  }
+
+  /**
+   * Gets port
+   */
+  @Override
+  public int getPort() {
+    return port;
+  }
+
+  /**
+   * Closes resources
+   */
+  @Override
+  public void close() throws Exception {
+    transport.close();
+  }
+
+  /**
+   * Registers an (identifier, address) mapping locally
+   *
+   * @param id   an identifier
+   * @param addr an Internet socket address
+   */
+  @Override
+  public void register(final Identifier id, final InetSocketAddress addr) {
+    LOG.log(Level.FINE, "id: " + id + " addr: " + addr);
+    idToAddrMap.put(id, addr);
+  }
+
+  /**
+   * Unregisters an identifier locally
+   *
+   * @param id an identifier
+   */
+  @Override
+  public void unregister(final Identifier id) {
+    LOG.log(Level.FINE, "id: " + id);
+    idToAddrMap.remove(id);
+  }
+
+  /**
+   * Finds an address for an identifier locally
+   *
+   * @param id an identifier
+   * @return an Internet socket address
+   */
+  @Override
+  public InetSocketAddress lookup(final Identifier id) {
+    LOG.log(Level.FINE, "id: {0}", id);
+    return idToAddrMap.get(id);
+  }
+
+  /**
+   * Finds addresses for identifiers locally
+   *
+   * @param identifiers an iterable of identifiers
+   * @return a list of name assignments
+   */
+  @Override
+  public List<NameAssignment> lookup(final Iterable<Identifier> identifiers) {
+    LOG.log(Level.FINE, "identifiers");
+    final List<NameAssignment> nas = new ArrayList<>();
+    for (final Identifier id : identifiers) {
+      final InetSocketAddress addr = idToAddrMap.get(id);
+      LOG.log(Level.FINEST, "id : {0} addr: {1}", new Object[]{id, addr});
+      if (addr != null) {
+        nas.add(new NameAssignmentTuple(id, addr));
+      }
+    }
+    return nas;
+  }
+
+  private String getNameServerId() {
+    return NetUtils.getLocalAddress() + ":" + getPort();
+  }
+}
+
+/**
+ * Naming server transport event handler that invokes a specific naming message handler
+ */
+class NamingServerHandler implements EventHandler<TransportEvent> {
+
+  private final Codec<NamingMessage> codec;
+  private final EventHandler<NamingMessage> handler;
+
+  NamingServerHandler(final EventHandler<NamingMessage> handler, final Codec<NamingMessage> codec) {
+    this.codec = codec;
+    this.handler = handler;
+  }
+
+  @Override
+  public void onNext(final TransportEvent value) {
+    final byte[] data = value.getData();
+    final NamingMessage message = codec.decode(data);
+    message.setLink(value.getLink());
+    handler.onNext(message);
+  }
+}
+
+/**
+ * Naming lookup request handler
+ */
+class NamingLookupRequestHandler implements EventHandler<NamingLookupRequest> {
+
+  private static final Logger LOG = Logger.getLogger(NamingLookupRequestHandler.class.getName());
+
+
+  private final NameServer server;
+  private final Codec<NamingMessage> codec;
+
+  public NamingLookupRequestHandler(final NameServer server, final Codec<NamingMessage> codec) {
+    this.server = server;
+    this.codec = codec;
+  }
+
+  @Override
+  public void onNext(final NamingLookupRequest value) {
+    final List<NameAssignment> nas = server.lookup(value.getIdentifiers());
+    final byte[] resp = codec.encode(new NamingLookupResponse(nas));
+    try {
+      value.getLink().write(resp);
+    } catch (final IOException e) {
+      //Actually, there is no way Link.write can throw and IOException
+      //after netty4 merge. This needs to cleaned up
+      LOG.throwing("NamingLookupRequestHandler", "onNext", e);
+    }
+  }
+}
+
+/**
+ * Naming register request handler
+ */
+class NamingRegisterRequestHandler implements EventHandler<NamingRegisterRequest> {
+
+  private static final Logger LOG = Logger.getLogger(NamingRegisterRequestHandler.class.getName());
+
+
+  private final NameServer server;
+  private final Codec<NamingMessage> codec;
+
+  public NamingRegisterRequestHandler(final NameServer server, final Codec<NamingMessage> codec) {
+    this.server = server;
+    this.codec = codec;
+  }
+
+  @Override
+  public void onNext(final NamingRegisterRequest value) {
+    server.register(value.getNameAssignment().getIdentifier(), value.getNameAssignment().getAddress());
+    final byte[] resp = codec.encode(new NamingRegisterResponse(value));
+    try {
+      value.getLink().write(resp);
+    } catch (final IOException e) {
+      //Actually, there is no way Link.write can throw and IOException
+      //after netty4 merge. This needs to cleaned up
+      LOG.throwing("NamingRegisterRequestHandler", "onNext", e);
+    }
+  }
+}
+
+/**
+ * Naming unregister request handler
+ */
+class NamingUnregisterRequestHandler implements EventHandler<NamingUnregisterRequest> {
+
+  private final NameServer server;
+
+  public NamingUnregisterRequestHandler(final NameServer server) {
+    this.server = server;
+  }
+
+  @Override
+  public void onNext(final NamingUnregisterRequest value) {
+    server.unregister(value.getIdentifier());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7492d26b/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java b/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
index 98c7146..f1f1a60 100644
--- a/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
+++ b/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
@@ -18,10 +18,7 @@
  */
 package org.apache.reef.services.network;
 
-import org.apache.reef.io.network.naming.NameCache;
-import org.apache.reef.io.network.naming.NameClient;
-import org.apache.reef.io.network.naming.NameLookupClient;
-import org.apache.reef.io.network.naming.NameServer;
+import org.apache.reef.io.network.naming.*;
 import org.apache.reef.io.network.naming.exception.NamingException;
 import org.apache.reef.io.network.util.StringIdentifierFactory;
 import org.apache.reef.tang.Tang;
@@ -73,7 +70,7 @@ public class NameClientTest {
   @Test
   public final void testClose() throws Exception {
     IdentifierFactory factory = new StringIdentifierFactory();
-    try (NameServer server = new NameServer(0, factory)) {
+    try (NameServer server = new NameServerImpl(0, factory)) {
       int serverPort = server.getPort();
       try (NameClient client = new NameClient(NetUtils.getLocalAddress(), serverPort, factory, retryCount, retryTimeout,
           new NameCache(10000))) {
@@ -95,7 +92,7 @@ public class NameClientTest {
   @Test
   public final void testLookup() throws Exception {
     IdentifierFactory factory = new StringIdentifierFactory();
-    try (NameServer server = new NameServer(0, factory)) {
+    try (NameServer server = new NameServerImpl(0, factory)) {
       int serverPort = server.getPort();
       try (NameClient client = new NameClient(NetUtils.getLocalAddress(), serverPort, factory, retryCount, retryTimeout,
           new NameCache(150))) {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7492d26b/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java b/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
index cecfd24..71382e2 100644
--- a/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
+++ b/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
@@ -83,7 +83,7 @@ public class NamingTest {
     idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
 
     // run a server
-    final NameServer server = new NameServer(0, this.factory);
+    final NameServer server = new NameServerImpl(0, this.factory);
     this.port = server.getPort();
     for (final Identifier id : idToAddrMap.keySet()) {
       server.register(id, idToAddrMap.get(id));
@@ -134,7 +134,7 @@ public class NamingTest {
       idToAddrMap.put(this.factory.getNewInstance("task3"), new InetSocketAddress(NetUtils.getLocalAddress(), 7003));
 
       // run a server
-      final NameServer server = new NameServer(0, this.factory);
+      final NameServer server = new NameServerImpl(0, this.factory);
       this.port = server.getPort();
       for (final Identifier id : idToAddrMap.keySet()) {
         server.register(id, idToAddrMap.get(id));
@@ -217,7 +217,7 @@ public class NamingTest {
 
     LOG.log(Level.FINEST, this.name.getMethodName());
 
-    final NameServer server = new NameServer(0, this.factory);
+    final NameServer server = new NameServerImpl(0, this.factory);
     this.port = server.getPort();
 
     // names to start with
@@ -278,7 +278,7 @@ public class NamingTest {
 
     LOG.log(Level.FINEST, this.name.getMethodName());
 
-    final NameServer server = new NameServer(0, this.factory);
+    final NameServer server = new NameServerImpl(0, this.factory);
     this.port = server.getPort();
 
     final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7492d26b/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java b/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
index 8deb694..96cf7d4 100644
--- a/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
+++ b/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
@@ -24,9 +24,12 @@ import org.apache.reef.io.network.Message;
 import org.apache.reef.io.network.impl.MessagingTransportFactory;
 import org.apache.reef.io.network.impl.NetworkService;
 import org.apache.reef.io.network.naming.NameServer;
+import org.apache.reef.io.network.naming.NameServerImpl;
 import org.apache.reef.io.network.util.StringIdentifierFactory;
 import org.apache.reef.services.network.util.Monitor;
 import org.apache.reef.services.network.util.StringCodec;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.Identifier;
 import org.apache.reef.wake.IdentifierFactory;
@@ -34,7 +37,7 @@ import org.apache.reef.wake.remote.NetUtils;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
-
+import org.junit.Assert;
 import java.net.InetSocketAddress;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -62,7 +65,7 @@ public class NetworkServiceTest {
     IdentifierFactory factory = new StringIdentifierFactory();
     String nameServerAddr = NetUtils.getLocalAddress();
 
-    NameServer server = new NameServer(0, factory);
+    NameServer server = new NameServerImpl(0, factory);
     int nameServerPort = server.getPort();
 
     final int numMessages = 10;
@@ -120,7 +123,7 @@ public class NetworkServiceTest {
     IdentifierFactory factory = new StringIdentifierFactory();
     String nameServerAddr = NetUtils.getLocalAddress();
 
-    NameServer server = new NameServer(0, factory);
+    NameServer server = new NameServerImpl(0, factory);
     int nameServerPort = server.getPort();
 
     final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
@@ -194,7 +197,7 @@ public class NetworkServiceTest {
     final IdentifierFactory factory = new StringIdentifierFactory();
     final String nameServerAddr = NetUtils.getLocalAddress();
 
-    final NameServer server = new NameServer(0, factory);
+    final NameServer server = new NameServerImpl(0, factory);
     final int nameServerPort = server.getPort();
 
     BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>();
@@ -287,7 +290,7 @@ public class NetworkServiceTest {
     IdentifierFactory factory = new StringIdentifierFactory();
     String nameServerAddr = NetUtils.getLocalAddress();
 
-    NameServer server = new NameServer(0, factory);
+    NameServer server = new NameServerImpl(0, factory);
     int nameServerPort = server.getPort();
 
     final int[] messageSizes = {2000};// {1,16,32,64,512,64*1024,1024*1024};
@@ -379,7 +382,7 @@ public class NetworkServiceTest {
     IdentifierFactory factory = new StringIdentifierFactory();
     String nameServerAddr = NetUtils.getLocalAddress();
 
-    NameServer server = new NameServer(0, factory);
+    NameServer server = new NameServerImpl(0, factory);
     int nameServerPort = server.getPort();
 
     final int batchSize = 1024 * 1024;