You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:00 UTC

[27/51] [partial] incubator-reef git commit: [REEF-93] Move java sources to lang/java

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSMessageCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSMessageCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSMessageCodec.java
new file mode 100644
index 0000000..f22f970
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSMessageCodec.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.reef.io.network.exception.NetworkRuntimeException;
+import org.apache.reef.io.network.proto.ReefNetworkServiceProtos.NSMessagePBuf;
+import org.apache.reef.io.network.proto.ReefNetworkServiceProtos.NSRecordPBuf;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Network service message codec
+ *
+ * @param <T> type
+ */
+public class NSMessageCodec<T> implements Codec<NSMessage<T>> {
+
+  private final Codec<T> codec;
+  private final IdentifierFactory factory;
+  private final boolean isStreamingCodec;
+
+  /**
+   * Constructs a network service message codec
+   *
+   * @param codec   a codec
+   * @param factory an identifier factory
+   */
+  public NSMessageCodec(final Codec<T> codec, final IdentifierFactory factory) {
+    this.codec = codec;
+    this.factory = factory;
+    this.isStreamingCodec = codec instanceof StreamingCodec;
+  }
+
+  /**
+   * Encodes a network service message to bytes
+   *
+   * @param obj a message
+   * @return bytes
+   */
+  @Override
+  public byte[] encode(final NSMessage<T> obj) {
+    if (isStreamingCodec) {
+      final StreamingCodec<T> streamingCodec = (StreamingCodec<T>) codec;
+      try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+        try (DataOutputStream daos = new DataOutputStream(baos)) {
+          daos.writeUTF(obj.getSrcId().toString());
+          daos.writeUTF(obj.getDestId().toString());
+          daos.writeInt(obj.getData().size());
+          for (final T rec : obj.getData()) {
+            streamingCodec.encodeToStream(rec, daos);
+          }
+        }
+        return baos.toByteArray();
+      } catch (final IOException e) {
+        throw new RuntimeException("IOException", e);
+      }
+    } else {
+      final NSMessagePBuf.Builder pbuf = NSMessagePBuf.newBuilder();
+      pbuf.setSrcid(obj.getSrcId().toString());
+      pbuf.setDestid(obj.getDestId().toString());
+      for (final T rec : obj.getData()) {
+        final NSRecordPBuf.Builder rbuf = NSRecordPBuf.newBuilder();
+        rbuf.setData(ByteString.copyFrom(codec.encode(rec)));
+        pbuf.addMsgs(rbuf);
+      }
+      return pbuf.build().toByteArray();
+    }
+  }
+
+  /**
+   * Decodes a network service message from bytes
+   *
+   * @param buf bytes
+   * @return a message
+   */
+  @Override
+  public NSMessage<T> decode(final byte[] buf) {
+    if (isStreamingCodec) {
+      final StreamingCodec<T> streamingCodec = (StreamingCodec<T>) codec;
+      try (ByteArrayInputStream bais = new ByteArrayInputStream(buf)) {
+        try (DataInputStream dais = new DataInputStream(bais)) {
+          final Identifier srcId = factory.getNewInstance(dais.readUTF());
+          final Identifier destId = factory.getNewInstance(dais.readUTF());
+          final int size = dais.readInt();
+          final List<T> list = new ArrayList<T>(size);
+          for (int i = 0; i < size; i++) {
+            list.add(streamingCodec.decodeFromStream(dais));
+          }
+          return new NSMessage<>(srcId, destId, list);
+        }
+      } catch (final IOException e) {
+        throw new RuntimeException("IOException", e);
+      }
+    } else {
+      NSMessagePBuf pbuf;
+      try {
+        pbuf = NSMessagePBuf.parseFrom(buf);
+      } catch (final InvalidProtocolBufferException e) {
+        e.printStackTrace();
+        throw new NetworkRuntimeException(e);
+      }
+      final List<T> list = new ArrayList<T>();
+      for (final NSRecordPBuf rbuf : pbuf.getMsgsList()) {
+        list.add(codec.decode(rbuf.getData().toByteArray()));
+      }
+      return new NSMessage<T>(factory.getNewInstance(pbuf.getSrcid()), factory.getNewInstance(pbuf.getDestid()), list);
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NameServiceCloseHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NameServiceCloseHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NameServiceCloseHandler.java
new file mode 100644
index 0000000..f569fcd
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NameServiceCloseHandler.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.evaluator.context.events.ContextStop;
+import org.apache.reef.io.network.naming.NameServer;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public final class NameServiceCloseHandler implements EventHandler<ContextStop> {
+
+  private static final Logger LOG = Logger.getLogger(NameServiceCloseHandler.class.getName());
+
+  private final AutoCloseable toClose;
+
+  @Inject
+  public NameServiceCloseHandler(final NameServer toClose) {
+    this.toClose = toClose;
+  }
+
+  @Override
+  public void onNext(final ContextStop event) {
+    try {
+      LOG.log(Level.FINEST, "Closing {0}", this.toClose);
+      this.toClose.close();
+    } catch (final Throwable ex) {
+      LOG.log(Level.SEVERE, "Exception while closing " + this.toClose, ex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
new file mode 100644
index 0000000..6120193
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.io.Tuple;
+import org.apache.reef.io.naming.Naming;
+import org.apache.reef.io.network.Connection;
+import org.apache.reef.io.network.ConnectionFactory;
+import org.apache.reef.io.network.Message;
+import org.apache.reef.io.network.TransportFactory;
+import org.apache.reef.io.network.naming.NameCache;
+import org.apache.reef.io.network.naming.NameClient;
+import org.apache.reef.io.network.naming.NameLookupClient;
+import org.apache.reef.io.network.naming.NameServerParameters;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.*;
+import org.apache.reef.wake.impl.LoggingEventHandler;
+import org.apache.reef.wake.impl.SingleThreadStage;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.LinkListener;
+import org.apache.reef.wake.remote.transport.Transport;
+
+import javax.inject.Inject;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Network service for Task
+ */
+public final class NetworkService<T> implements Stage, ConnectionFactory<T> {
+
+  private static final Logger LOG = Logger.getLogger(NetworkService.class.getName());
+
+  private static final int retryCount;
+  private static final int retryTimeout;
+
+  static {
+    try {
+      final Injector injector = Tang.Factory.getTang().newInjector();
+      retryCount = injector.getNamedInstance(NameLookupClient.RetryCount.class);
+      retryTimeout = injector.getNamedInstance(NameLookupClient.RetryTimeout.class);
+    } catch (final InjectionException ex) {
+      final String msg = "Exception while trying to find default values for retryCount & Timeout";
+      LOG.log(Level.SEVERE, msg, ex);
+      throw new RuntimeException(msg, ex);
+    }
+  }
+
+  private final IdentifierFactory factory;
+  private final Codec<T> codec;
+  private final Transport transport;
+  private final NameClient nameClient;
+  private final ConcurrentMap<Identifier, Connection<T>> idToConnMap = new ConcurrentHashMap<>();
+  private final EStage<Tuple<Identifier, InetSocketAddress>> nameServiceRegisteringStage;
+  private final EStage<Identifier> nameServiceUnregisteringStage;
+  private Identifier myId;
+
+  public NetworkService(final IdentifierFactory factory,
+                        final int nsPort,
+                        final String nameServerAddr,
+                        final int nameServerPort,
+                        final Codec<T> codec,
+                        final TransportFactory tpFactory,
+                        final EventHandler<Message<T>> recvHandler,
+                        final EventHandler<Exception> exHandler) {
+    this(factory, nsPort, nameServerAddr, nameServerPort,
+        retryCount, retryTimeout, codec, tpFactory, recvHandler, exHandler);
+  }
+
+  @Inject
+  public NetworkService(
+      final @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) IdentifierFactory factory,
+      final @Parameter(NetworkServiceParameters.NetworkServicePort.class) int nsPort,
+      final @Parameter(NameServerParameters.NameServerAddr.class) String nameServerAddr,
+      final @Parameter(NameServerParameters.NameServerPort.class) int nameServerPort,
+      final @Parameter(NameLookupClient.RetryCount.class) int retryCount,
+      final @Parameter(NameLookupClient.RetryTimeout.class) int retryTimeout,
+      final @Parameter(NetworkServiceParameters.NetworkServiceCodec.class) Codec<T> codec,
+      final @Parameter(NetworkServiceParameters.NetworkServiceTransportFactory.class) TransportFactory tpFactory,
+      final @Parameter(NetworkServiceParameters.NetworkServiceHandler.class) EventHandler<Message<T>> recvHandler,
+      final @Parameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class) EventHandler<Exception> exHandler) {
+
+    this.factory = factory;
+    this.codec = codec;
+    this.transport = tpFactory.create(nsPort,
+        new LoggingEventHandler<TransportEvent>(),
+        new MessageHandler<T>(recvHandler, codec, factory), exHandler);
+
+    this.nameClient = new NameClient(nameServerAddr, nameServerPort,
+        factory, retryCount, retryTimeout, new NameCache(30000));
+
+    this.nameServiceRegisteringStage = new SingleThreadStage<>(
+        "NameServiceRegisterer", new EventHandler<Tuple<Identifier, InetSocketAddress>>() {
+      @Override
+      public void onNext(final Tuple<Identifier, InetSocketAddress> tuple) {
+        try {
+          nameClient.register(tuple.getKey(), tuple.getValue());
+          LOG.log(Level.FINEST, "Registered {0} with nameservice", tuple.getKey());
+        } catch (final Exception ex) {
+          final String msg = "Unable to register " + tuple.getKey() + "with name service";
+          LOG.log(Level.WARNING, msg, ex);
+          throw new RuntimeException(msg, ex);
+        }
+      }
+    }, 5);
+
+    this.nameServiceUnregisteringStage = new SingleThreadStage<>(
+        "NameServiceRegisterer", new EventHandler<Identifier>() {
+      @Override
+      public void onNext(final Identifier id) {
+        try {
+          nameClient.unregister(id);
+          LOG.log(Level.FINEST, "Unregistered {0} with nameservice", id);
+        } catch (final Exception ex) {
+          final String msg = "Unable to unregister " + id + " with name service";
+          LOG.log(Level.WARNING, msg, ex);
+          throw new RuntimeException(msg, ex);
+        }
+      }
+    }, 5);
+  }
+
+  public void registerId(final Identifier id) {
+    this.myId = id;
+    final Tuple<Identifier, InetSocketAddress> tuple =
+        new Tuple<>(id, (InetSocketAddress) this.transport.getLocalAddress());
+    LOG.log(Level.FINEST, "Binding {0} to NetworkService@({1})",
+        new Object[]{tuple.getKey(), tuple.getValue()});
+    this.nameServiceRegisteringStage.onNext(tuple);
+  }
+
+  public void unregisterId(Identifier id) {
+    this.myId = null;
+    LOG.log(Level.FINEST, "Unbinding {0} to NetworkService@({1})",
+        new Object[]{id, this.transport.getLocalAddress()});
+    this.nameServiceUnregisteringStage.onNext(id);
+  }
+
+  public Identifier getMyId() {
+    return this.myId;
+  }
+
+  public Transport getTransport() {
+    return this.transport;
+  }
+
+  public Codec<T> getCodec() {
+    return this.codec;
+  }
+
+  public Naming getNameClient() {
+    return this.nameClient;
+  }
+
+  public IdentifierFactory getIdentifierFactory() {
+    return this.factory;
+  }
+
+  void remove(final Identifier id) {
+    this.idToConnMap.remove(id);
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.log(Level.FINE, "Shutting down");
+    this.transport.close();
+    this.nameClient.close();
+  }
+
+  @Override
+  public Connection<T> newConnection(final Identifier destId) {
+
+    if (this.myId == null) {
+      throw new RuntimeException(
+          "Trying to establish a connection from a Network Service that is not bound to any task");
+    }
+
+    final Connection<T> conn = this.idToConnMap.get(destId);
+    if (conn != null) {
+      return conn;
+    }
+
+    final Connection<T> newConnection = new NSConnection<T>(
+        this.myId, destId, new LinkListener<T>() {
+      @Override
+      public void messageReceived(final Object message) {
+      }
+    }, this);
+
+    final Connection<T> existing = this.idToConnMap.putIfAbsent(destId, newConnection);
+    return existing == null ? newConnection : existing;
+  }
+}
+
+class MessageHandler<T> implements EventHandler<TransportEvent> {
+
+  private final EventHandler<Message<T>> handler;
+  private final NSMessageCodec<T> codec;
+
+  public MessageHandler(final EventHandler<Message<T>> handler,
+                        final Codec<T> codec, final IdentifierFactory factory) {
+    this.handler = handler;
+    this.codec = new NSMessageCodec<T>(codec, factory);
+  }
+
+  @Override
+  public void onNext(final TransportEvent value) {
+    final byte[] data = value.getData();
+    final NSMessage<T> obj = this.codec.decode(data);
+    this.handler.onNext(obj);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceClosingHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceClosingHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceClosingHandler.java
new file mode 100644
index 0000000..6e17922
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceClosingHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.evaluator.context.events.ContextStop;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+public class NetworkServiceClosingHandler implements EventHandler<ContextStop> {
+  private final NetworkService<?> networkService;
+
+  @Inject
+  public NetworkServiceClosingHandler(final NetworkService<?> networkService) {
+    this.networkService = networkService;
+  }
+
+  @Override
+  public void onNext(ContextStop arg0) {
+    try {
+      networkService.close();
+    } catch (Exception e) {
+      throw new RuntimeException("Exception while closing NetworkService", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
new file mode 100644
index 0000000..a31c5de
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.io.network.TransportFactory;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+
+public class NetworkServiceParameters {
+
+  @NamedParameter
+  public static class TaskId implements Name<String> {
+
+  }
+
+  @NamedParameter(doc = "identifier factory for the service", short_name = "factory", default_class = StringIdentifierFactory.class)
+  public static class NetworkServiceIdentifierFactory implements Name<IdentifierFactory> {
+  }
+
+  @NamedParameter(doc = "port for the network service", short_name = "nsport", default_value = "7070")
+  public static class NetworkServicePort implements Name<Integer> {
+  }
+
+  @NamedParameter(doc = "codec for the network service", short_name = "nscodec")
+  public static class NetworkServiceCodec implements Name<Codec<?>> {
+  }
+
+  @NamedParameter(doc = "transport factory for the network service", short_name = "nstransportfactory", default_class = MessagingTransportFactory.class)
+  public static class NetworkServiceTransportFactory implements Name<TransportFactory> {
+  }
+
+  @NamedParameter(doc = "network receive handler for the network service", short_name = "nshandler")
+  public static class NetworkServiceHandler implements Name<EventHandler<?>> {
+  }
+
+  @NamedParameter(doc = "network exception handler for the network service", short_name = "exhandler")
+  public static class NetworkServiceExceptionHandler implements Name<EventHandler<?>> {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/StreamingCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/StreamingCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/StreamingCodec.java
new file mode 100644
index 0000000..ac669af
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/StreamingCodec.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.wake.remote.Codec;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+/**
+ * A codec that can make serialization more efficient when an object has to be
+ * codec'ed through a chain of codecs
+ */
+public interface StreamingCodec<T> extends Codec<T> {
+
+  void encodeToStream(T obj, DataOutputStream stream);
+
+  T decodeFromStream(DataInputStream stream);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNSFromTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNSFromTask.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNSFromTask.java
new file mode 100644
index 0000000..df337d9
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNSFromTask.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.events.TaskStop;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.IdentifierFactory;
+
+import javax.inject.Inject;
+
+public class UnbindNSFromTask implements EventHandler<TaskStop> {
+
+  private final NetworkService<?> ns;
+  private final IdentifierFactory idFac;
+
+  @Inject
+  public UnbindNSFromTask(
+      final NetworkService<?> ns,
+      final @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) IdentifierFactory idFac) {
+    this.ns = ns;
+    this.idFac = idFac;
+  }
+
+  @Override
+  public void onNext(final TaskStop task) {
+    this.ns.unregisterId(this.idFac.getNewInstance(task.getId()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/package-info.java
new file mode 100644
index 0000000..fbdc2ef
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameAssignmentTuple.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameAssignmentTuple.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameAssignmentTuple.java
new file mode 100644
index 0000000..c7792fd
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameAssignmentTuple.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.naming.NameAssignment;
+import org.apache.reef.wake.Identifier;
+
+import java.net.InetSocketAddress;
+
+/**
+ * An implementation of the NameAssignment interface
+ */
+public class NameAssignmentTuple implements NameAssignment {
+
+  private final Identifier id;
+  private final InetSocketAddress addr;
+
+  /**
+   * Constructs a name assignment tuple
+   *
+   * @param id   an identifier
+   * @param addr an Internet socket address
+   */
+  public NameAssignmentTuple(Identifier id, InetSocketAddress addr) {
+    this.id = id;
+    this.addr = addr;
+  }
+
+  /**
+   * Gets an identifier
+   *
+   * @return an identifier
+   */
+  @Override
+  public Identifier getIdentifier() {
+    return id;
+  }
+
+  /**
+   * Gets an address
+   *
+   * @return an Internet socket address
+   */
+  @Override
+  public InetSocketAddress getAddress() {
+    return addr;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameCache.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameCache.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameCache.java
new file mode 100644
index 0000000..384bfe4
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameCache.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import com.google.common.cache.CacheBuilder;
+import org.apache.reef.io.network.Cache;
+import org.apache.reef.wake.Identifier;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Naming cache implementation
+ */
+public class NameCache implements Cache<Identifier, InetSocketAddress> {
+
+  private final com.google.common.cache.Cache<Identifier, InetSocketAddress> cache;
+
+  /**
+   * Constructs a naming cache
+   *
+   * @param timeout a cache entry timeout after access
+   */
+  public NameCache(long timeout) {
+    cache = CacheBuilder.newBuilder()
+        .expireAfterWrite(timeout, TimeUnit.MILLISECONDS)
+        .build();
+  }
+
+  /**
+   * Gets an address for an identifier
+   *
+   * @param key          an identifier
+   * @param valueFetcher a callable to load a value for the corresponding identifier
+   * @return an Internet socket address
+   * @throws ExecutionException
+   */
+  @Override
+  public InetSocketAddress get(Identifier key,
+                               Callable<InetSocketAddress> valueFetcher) throws ExecutionException {
+    return cache.get(key, valueFetcher);
+  }
+
+  /**
+   * Invalidates the entry for an identifier
+   *
+   * @param key an identifier
+   */
+  @Override
+  public void invalidate(Identifier key) {
+    cache.invalidate(key);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
new file mode 100644
index 0000000..79b4a92
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.naming.Naming;
+import org.apache.reef.io.network.Cache;
+import org.apache.reef.io.network.naming.exception.NamingRuntimeException;
+import org.apache.reef.io.network.naming.serialization.NamingLookupResponse;
+import org.apache.reef.io.network.naming.serialization.NamingMessage;
+import org.apache.reef.io.network.naming.serialization.NamingRegisterResponse;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.Stage;
+import org.apache.reef.wake.impl.SyncStage;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.NetUtils;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Naming client
+ */
+public class NameClient implements Stage, Naming {
+  private static final Logger LOG = Logger.getLogger(NameClient.class.getName());
+
+  private NameLookupClient lookupClient;
+  private NameRegistryClient registryClient;
+  private Transport transport;
+
+  /**
+   * Constructs a naming client
+   *
+   * @param serverAddr a server address
+   * @param serverPort a server port number
+   * @param factory    an identifier factory
+   * @param cache      a cache
+   */
+  public NameClient(String serverAddr, int serverPort,
+                    IdentifierFactory factory, int retryCount, int retryTimeout,
+                    Cache<Identifier, InetSocketAddress> cache) {
+    this(serverAddr, serverPort, 10000, factory, retryCount, retryTimeout, cache);
+  }
+
+  /**
+   * Constructs a naming client
+   *
+   * @param serverAddr a server address
+   * @param serverPort a server port number
+   * @param timeout    timeout in ms
+   * @param factory    an identifier factory
+   * @param cache      a cache
+   */
+  public NameClient(final String serverAddr, final int serverPort, final long timeout,
+                    final IdentifierFactory factory, final int retryCount, final int retryTimeout,
+                    final Cache<Identifier, InetSocketAddress> cache) {
+
+    final BlockingQueue<NamingLookupResponse> replyLookupQueue = new LinkedBlockingQueue<NamingLookupResponse>();
+    final BlockingQueue<NamingRegisterResponse> replyRegisterQueue = new LinkedBlockingQueue<NamingRegisterResponse>();
+    final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
+
+    this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), 0,
+        new SyncStage<>(new NamingClientEventHandler(
+            new NamingResponseHandler(replyLookupQueue, replyRegisterQueue), codec)),
+        null, retryCount, retryTimeout);
+
+    this.lookupClient = new NameLookupClient(serverAddr, serverPort, timeout,
+        factory, retryCount, retryTimeout, replyLookupQueue, this.transport, cache);
+
+    this.registryClient = new NameRegistryClient(serverAddr, serverPort, timeout,
+        factory, replyRegisterQueue, this.transport);
+  }
+
+  /**
+   * Registers an (identifier, address) mapping
+   *
+   * @param id   an identifier
+   * @param addr an Internet socket address
+   */
+  @Override
+  public void register(final Identifier id, final InetSocketAddress addr)
+      throws Exception {
+    LOG.log(Level.FINE, "Refister {0} : {1}", new Object[]{id, addr});
+    this.registryClient.register(id, addr);
+  }
+
+  /**
+   * Unregisters an identifier
+   *
+   * @param id an identifier
+   */
+  @Override
+  public void unregister(final Identifier id) throws IOException {
+    this.registryClient.unregister(id);
+  }
+
+  /**
+   * Finds an address for an identifier
+   *
+   * @param id an identifier
+   * @return an Internet socket address
+   */
+  @Override
+  public InetSocketAddress lookup(final Identifier id) throws Exception {
+    return this.lookupClient.lookup(id);
+  }
+
+  /**
+   * Retrieves an address for an identifier remotely
+   *
+   * @param id an identifier
+   * @return an Internet socket address
+   * @throws Exception
+   */
+  public InetSocketAddress remoteLookup(final Identifier id) throws Exception {
+    return this.lookupClient.remoteLookup(id);
+  }
+
+  /**
+   * Closes resources
+   */
+  @Override
+  public void close() throws Exception {
+
+    if (this.lookupClient != null) {
+      this.lookupClient.close();
+    }
+
+    if (this.registryClient != null) {
+      this.registryClient.close();
+    }
+
+    if (this.transport != null) {
+      this.transport.close();
+    }
+  }
+}
+
+/**
+ * Naming client transport event handler
+ */
+class NamingClientEventHandler implements EventHandler<TransportEvent> {
+
+  private static final Logger LOG = Logger.getLogger(NamingClientEventHandler.class.getName());
+
+  private final EventHandler<NamingMessage> handler;
+  private final Codec<NamingMessage> codec;
+
+  public NamingClientEventHandler(
+      final EventHandler<NamingMessage> handler, final Codec<NamingMessage> codec) {
+    this.handler = handler;
+    this.codec = codec;
+  }
+
+  @Override
+  public void onNext(final TransportEvent value) {
+    LOG.log(Level.FINE, "Transport: ", value);
+    this.handler.onNext(this.codec.decode(value.getData()));
+  }
+}
+
+/**
+ * Naming response message handler
+ */
+class NamingResponseHandler implements EventHandler<NamingMessage> {
+
+  private final BlockingQueue<NamingLookupResponse> replyLookupQueue;
+  private final BlockingQueue<NamingRegisterResponse> replyRegisterQueue;
+
+  NamingResponseHandler(BlockingQueue<NamingLookupResponse> replyLookupQueue,
+                        BlockingQueue<NamingRegisterResponse> replyRegisterQueue) {
+    this.replyLookupQueue = replyLookupQueue;
+    this.replyRegisterQueue = replyRegisterQueue;
+  }
+
+  @Override
+  public void onNext(NamingMessage value) {
+    if (value instanceof NamingLookupResponse) {
+      replyLookupQueue.offer((NamingLookupResponse) value);
+    } else if (value instanceof NamingRegisterResponse) {
+      replyRegisterQueue.offer((NamingRegisterResponse) value);
+    } else {
+      throw new NamingRuntimeException("Unknown naming response message");
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
new file mode 100644
index 0000000..31b7fca
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.naming.NameAssignment;
+import org.apache.reef.io.naming.NamingLookup;
+import org.apache.reef.io.network.Cache;
+import org.apache.reef.io.network.naming.exception.NamingException;
+import org.apache.reef.io.network.naming.serialization.NamingLookupRequest;
+import org.apache.reef.io.network.naming.serialization.NamingLookupResponse;
+import org.apache.reef.io.network.naming.serialization.NamingMessage;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.Stage;
+import org.apache.reef.wake.impl.SyncStage;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.NetUtils;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Link;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener;
+import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Naming lookup client
+ */
+public class NameLookupClient implements Stage, NamingLookup {
+
+  private static final Logger LOG = Logger.getLogger(NameLookupClient.class.getName());
+  private final SocketAddress serverSocketAddr;
+  private final Transport transport;
+  private final Codec<NamingMessage> codec;
+  private final BlockingQueue<NamingLookupResponse> replyQueue;
+  private final long timeout;
+  private final Cache<Identifier, InetSocketAddress> cache;
+  private final int retryCount;
+  private final int retryTimeout;
+
+  /**
+   * Constructs a naming lookup client
+   *
+   * @param serverAddr a server address
+   * @param serverPort a server port number
+   * @param factory    an identifier factory
+   * @param cache      an cache
+   */
+  public NameLookupClient(final String serverAddr, final int serverPort,
+                          final IdentifierFactory factory, final int retryCount, final int retryTimeout,
+                          final Cache<Identifier, InetSocketAddress> cache) {
+    this(serverAddr, serverPort, 10000, factory, retryCount, retryTimeout, cache);
+  }
+
+  /**
+   * Constructs a naming lookup client
+   *
+   * @param serverAddr a server address
+   * @param serverPort a server port number
+   * @param timeout    request timeout in ms
+   * @param factory    an identifier factory
+   * @param cache      an cache
+   */
+  public NameLookupClient(final String serverAddr, final int serverPort, final long timeout,
+                          final IdentifierFactory factory, final int retryCount, final int retryTimeout,
+                          final Cache<Identifier, InetSocketAddress> cache) {
+
+    this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort);
+    this.timeout = timeout;
+    this.cache = cache;
+    this.codec = NamingCodecFactory.createLookupCodec(factory);
+    this.replyQueue = new LinkedBlockingQueue<>();
+
+    this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), 0,
+        new SyncStage<>(new NamingLookupClientHandler(
+            new NamingLookupResponseHandler(this.replyQueue), this.codec)),
+        null, retryCount, retryTimeout);
+
+    this.retryCount = retryCount;
+    this.retryTimeout = retryTimeout;
+  }
+
+  NameLookupClient(final String serverAddr, final int serverPort, final long timeout,
+                   final IdentifierFactory factory, final int retryCount, final int retryTimeout,
+                   final BlockingQueue<NamingLookupResponse> replyQueue, final Transport transport,
+                   final Cache<Identifier, InetSocketAddress> cache) {
+
+    this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort);
+    this.timeout = timeout;
+    this.cache = cache;
+    this.codec = NamingCodecFactory.createFullCodec(factory);
+    this.replyQueue = replyQueue;
+    this.transport = transport;
+    this.retryCount = retryCount;
+    this.retryTimeout = retryTimeout;
+  }
+
+  /**
+   * Finds an address for an identifier
+   *
+   * @param id an identifier
+   * @return an Internet socket address
+   */
+  @Override
+  public InetSocketAddress lookup(final Identifier id) throws Exception {
+
+    return cache.get(id, new Callable<InetSocketAddress>() {
+
+      @Override
+      public InetSocketAddress call() throws Exception {
+        final int origRetryCount = NameLookupClient.this.retryCount;
+        int retryCount = origRetryCount;
+        while (true) {
+          try {
+            return remoteLookup(id);
+          } catch (final NamingException e) {
+            if (retryCount <= 0) {
+              throw e;
+            } else {
+              final int retryTimeout = NameLookupClient.this.retryTimeout
+                  * (origRetryCount - retryCount + 1);
+              LOG.log(Level.WARNING,
+                  "Caught Naming Exception while looking up " + id
+                      + " with Name Server. Will retry " + retryCount
+                      + " time(s) after waiting for " + retryTimeout + " msec.");
+              Thread.sleep(retryTimeout * retryCount);
+              --retryCount;
+            }
+          }
+        }
+      }
+
+    });
+  }
+
+  /**
+   * Retrieves an address for an identifier remotely
+   *
+   * @param id an identifier
+   * @return an Internet socket address
+   * @throws Exception
+   */
+  public InetSocketAddress remoteLookup(final Identifier id) throws Exception {
+    // the lookup is not thread-safe, because concurrent replies may
+    // be read by the wrong thread.
+    // TODO: better fix uses a map of id's after REEF-198
+    synchronized (this) {
+
+      LOG.log(Level.INFO, "Looking up {0} on NameServer {1}", new Object[]{id, serverSocketAddr});
+
+      final List<Identifier> ids = Arrays.asList(id);
+      final Link<NamingMessage> link = transport.open(serverSocketAddr, codec,
+          new LoggingLinkListener<NamingMessage>());
+      link.write(new NamingLookupRequest(ids));
+
+      final NamingLookupResponse resp;
+      for (; ; ) {
+        try {
+          resp = replyQueue.poll(timeout, TimeUnit.MILLISECONDS);
+          break;
+        } catch (final InterruptedException e) {
+          LOG.log(Level.INFO, "Lookup interrupted", e);
+          throw new NamingException(e);
+        }
+      }
+
+      final List<NameAssignment> list = resp.getNameAssignments();
+      if (list.isEmpty()) {
+        throw new NamingException("Cannot find " + id + " from the name server");
+      } else {
+        return list.get(0).getAddress();
+      }
+    }
+  }
+
+  /**
+   * Closes resources
+   */
+  @Override
+  public void close() throws Exception {
+    // Should not close transport as we did not
+    // create it
+  }
+
+  @NamedParameter(doc = "When should a retry timeout(msec)?", short_name = "retryTimeout", default_value = "100")
+  public static class RetryTimeout implements Name<Integer> {
+  }
+
+  @NamedParameter(doc = "How many times should I retry?", short_name = "retryCount", default_value = "10")
+  public static class RetryCount implements Name<Integer> {
+  }
+}
+
+/**
+ * Naming lookup client transport event handler
+ */
+class NamingLookupClientHandler implements EventHandler<TransportEvent> {
+
+  private final EventHandler<NamingLookupResponse> handler;
+  private final Codec<NamingMessage> codec;
+
+  NamingLookupClientHandler(final EventHandler<NamingLookupResponse> handler, final Codec<NamingMessage> codec) {
+    this.handler = handler;
+    this.codec = codec;
+  }
+
+  @Override
+  public void onNext(final TransportEvent value) {
+    handler.onNext((NamingLookupResponse) codec.decode(value.getData()));
+  }
+
+}
+
+/**
+ * Naming lookup response handler
+ */
+class NamingLookupResponseHandler implements EventHandler<NamingLookupResponse> {
+
+  private final BlockingQueue<NamingLookupResponse> replyQueue;
+
+  NamingLookupResponseHandler(final BlockingQueue<NamingLookupResponse> replyQueue) {
+    this.replyQueue = replyQueue;
+  }
+
+  @Override
+  public void onNext(final NamingLookupResponse value) {
+    replyQueue.offer(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java
new file mode 100644
index 0000000..554a33b
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.naming.NamingRegistry;
+import org.apache.reef.io.network.naming.exception.NamingException;
+import org.apache.reef.io.network.naming.serialization.NamingMessage;
+import org.apache.reef.io.network.naming.serialization.NamingRegisterRequest;
+import org.apache.reef.io.network.naming.serialization.NamingRegisterResponse;
+import org.apache.reef.io.network.naming.serialization.NamingUnregisterRequest;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.Stage;
+import org.apache.reef.wake.impl.SyncStage;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.NetUtils;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Link;
+import org.apache.reef.wake.remote.transport.LinkListener;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener;
+import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Naming registry client
+ */
+public class NameRegistryClient implements Stage, NamingRegistry {
+
+  private static final Logger LOG = Logger.getLogger(NameRegistryClient.class.getName());
+
+  private final SocketAddress serverSocketAddr;
+  private final Transport transport;
+  private final Codec<NamingMessage> codec;
+  private final BlockingQueue<NamingRegisterResponse> replyQueue;
+  private final long timeout;
+
+  /**
+   * Constructs a naming registry client
+   *
+   * @param serverAddr a name server address
+   * @param serverPort a name server port
+   * @param factory    an identifier factory
+   */
+  public NameRegistryClient(
+      final String serverAddr, final int serverPort, final IdentifierFactory factory) {
+    this(serverAddr, serverPort, 10000, factory);
+  }
+
+  /**
+   * Constructs a naming registry client
+   *
+   * @param serverAddr a name server address
+   * @param serverPort a name server port
+   * @param timeout    timeout in ms
+   * @param factory    an identifier factory
+   */
+  public NameRegistryClient(final String serverAddr, final int serverPort,
+                            final long timeout, final IdentifierFactory factory) {
+
+    this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort);
+    this.timeout = timeout;
+    this.codec = NamingCodecFactory.createRegistryCodec(factory);
+    this.replyQueue = new LinkedBlockingQueue<>();
+    this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), 0,
+        new SyncStage<>(new NamingRegistryClientHandler(new NamingRegistryResponseHandler(replyQueue), codec)),
+        null, 3, 10000);
+  }
+
+  public NameRegistryClient(final String serverAddr, final int serverPort,
+                            final long timeout, final IdentifierFactory factory,
+                            final BlockingQueue<NamingRegisterResponse> replyQueue,
+                            final Transport transport) {
+    this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort);
+    this.timeout = timeout;
+    this.codec = NamingCodecFactory.createFullCodec(factory);
+    this.replyQueue = replyQueue;
+    this.transport = transport;
+  }
+
+  /**
+   * Registers an (identifier, address) mapping
+   *
+   * @param id   an identifier
+   * @param addr an Internet socket address
+   */
+  @Override
+  public void register(final Identifier id, final InetSocketAddress addr) throws Exception {
+
+    // needed to keep threads from reading the wrong response
+    // TODO: better fix matches replies to threads with a map after REEF-198
+    synchronized (this) {
+
+      LOG.log(Level.FINE, "Register {0} : {1}", new Object[]{id, addr});
+
+      final Link<NamingMessage> link = this.transport.open(
+          this.serverSocketAddr, this.codec, new LoggingLinkListener<NamingMessage>());
+
+      link.write(new NamingRegisterRequest(new NameAssignmentTuple(id, addr)));
+
+      for (; ; ) {
+        try {
+          this.replyQueue.poll(this.timeout, TimeUnit.MILLISECONDS);
+          break;
+        } catch (final InterruptedException e) {
+          LOG.log(Level.INFO, "Interrupted", e);
+          throw new NamingException(e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Unregisters an identifier
+   *
+   * @param id an identifier
+   */
+  @Override
+  public void unregister(Identifier id) throws IOException {
+    Link<NamingMessage> link = transport.open(serverSocketAddr, codec,
+        new LinkListener<NamingMessage>() {
+          @Override
+          public void messageReceived(NamingMessage message) {
+          }
+        });
+    link.write(new NamingUnregisterRequest(id));
+  }
+
+  /**
+   * Closes resources
+   */
+  @Override
+  public void close() throws Exception {
+    // Should not close transport as we did not
+    // create it
+  }
+}
+
+/**
+ * Naming registry client transport event handler
+ */
+class NamingRegistryClientHandler implements EventHandler<TransportEvent> {
+  private static final Logger LOG = Logger.getLogger(NamingRegistryClientHandler.class.getName());
+
+  private final EventHandler<NamingRegisterResponse> handler;
+  private final Codec<NamingMessage> codec;
+
+  NamingRegistryClientHandler(EventHandler<NamingRegisterResponse> handler, Codec<NamingMessage> codec) {
+    this.handler = handler;
+    this.codec = codec;
+  }
+
+  @Override
+  public void onNext(TransportEvent value) {
+    LOG.log(Level.FINE, value.toString());
+    handler.onNext((NamingRegisterResponse) codec.decode(value.getData()));
+  }
+}
+
+/**
+ * Naming register response handler
+ */
+class NamingRegistryResponseHandler implements EventHandler<NamingRegisterResponse> {
+
+  private final BlockingQueue<NamingRegisterResponse> replyQueue;
+
+  NamingRegistryResponseHandler(BlockingQueue<NamingRegisterResponse> replyQueue) {
+    this.replyQueue = replyQueue;
+  }
+
+  @Override
+  public void onNext(NamingRegisterResponse value) {
+    replyQueue.offer(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java
new file mode 100644
index 0000000..5a4c765
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.naming.NameAssignment;
+import org.apache.reef.io.network.naming.serialization.*;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.Stage;
+import org.apache.reef.wake.impl.MultiEventHandler;
+import org.apache.reef.wake.impl.SyncStage;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.NetUtils;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+import org.apache.reef.webserver.AvroReefServiceInfo;
+import org.apache.reef.webserver.ReefEventStateManager;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Naming server interface
+ */
+public interface NameServer extends Stage {
+
+  /**
+   * get port number
+   * @return
+   */
+  public int getPort();
+
+  /**
+   * Registers an (identifier, address) mapping locally
+   *
+   * @param id   an identifier
+   * @param addr an Internet socket address
+   */
+  public void register(final Identifier id, final InetSocketAddress addr);
+
+  /**
+   * Unregisters an identifier locally
+   *
+   * @param id an identifier
+   */
+  public void unregister(final Identifier id);
+
+  /**
+   * Finds an address for an identifier locally
+   *
+   * @param id an identifier
+   * @return an Internet socket address
+   */
+  public InetSocketAddress lookup(final Identifier id);
+
+  /**
+   * Finds addresses for identifiers locally
+   *
+   * @param identifiers an Iterable of identifiers
+   * @return a list of name assignments
+   */
+  public List<NameAssignment> lookup(final Iterable<Identifier> identifiers);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerConfiguration.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerConfiguration.java
new file mode 100644
index 0000000..1912686
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerConfiguration.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.wake.IdentifierFactory;
+
+/**
+ * Configuration Module Builder for NameServer
+ */
+public final class NameServerConfiguration extends ConfigurationModuleBuilder {
+
+  /**
+   * The port used by name server
+   */
+  public static final OptionalParameter<Integer> NAME_SERVICE_PORT = new OptionalParameter<>();
+  /**
+   * DNS hostname running the name service
+   */
+  public static final OptionalParameter<String> NAME_SERVER_HOSTNAME = new OptionalParameter<>();
+  /**
+   * Identifier factory for the name service
+   */
+  public static final OptionalParameter<IdentifierFactory> NAME_SERVER_IDENTIFIER_FACTORY = new OptionalParameter<>();
+
+  public static final ConfigurationModule CONF = new NameServerConfiguration()
+      .bindNamedParameter(NameServerParameters.NameServerPort.class, NAME_SERVICE_PORT)
+      .bindNamedParameter(NameServerParameters.NameServerAddr.class, NAME_SERVER_HOSTNAME)
+      .bindNamedParameter(NameServerParameters.NameServerIdentifierFactory.class, NAME_SERVER_IDENTIFIER_FACTORY)
+      .bindImplementation(NameServer.class, NameServerImpl.class)
+      .build();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
new file mode 100644
index 0000000..306682a
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.naming.NameAssignment;
+import org.apache.reef.io.network.naming.serialization.*;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.Stage;
+import org.apache.reef.wake.impl.MultiEventHandler;
+import org.apache.reef.wake.impl.SyncStage;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.NetUtils;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+import org.apache.reef.webserver.AvroReefServiceInfo;
+import org.apache.reef.webserver.ReefEventStateManager;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Naming server implementation
+ */
+public class NameServerImpl implements NameServer {
+
+  private static final Logger LOG = Logger.getLogger(NameServer.class.getName());
+
+  private final Transport transport;
+  private final Map<Identifier, InetSocketAddress> idToAddrMap;
+  private final ReefEventStateManager reefEventStateManager;
+  private final int port;
+
+  /**
+   * @param port    a listening port number
+   * @param factory an identifier factory
+   * @deprecated inject the NameServer instead of new it up
+   * Constructs a name server
+   */
+  // TODO: All existing NameServer usage is currently new-up, need to make them injected as well.
+  @Deprecated
+  public NameServerImpl(
+      final int port,
+      final IdentifierFactory factory) {
+
+    this.reefEventStateManager = null;
+    final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
+    final EventHandler<NamingMessage> handler = createEventHandler(codec);
+
+    this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), port, null,
+        new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000);
+
+    this.port = transport.getListeningPort();
+    this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>());
+
+    LOG.log(Level.FINE, "NameServer starting, listening at port {0}", this.port);
+  }
+
+
+  /**
+   * Constructs a name server
+   *
+   * @param port                  a listening port number
+   * @param factory               an identifier factory
+   * @param reefEventStateManager the event state manager used to register name server info
+   */
+  @Inject
+  public NameServerImpl(
+      final @Parameter(NameServerParameters.NameServerPort.class) int port,
+      final @Parameter(NameServerParameters.NameServerIdentifierFactory.class) IdentifierFactory factory,
+      final ReefEventStateManager reefEventStateManager) {
+
+    this.reefEventStateManager = reefEventStateManager;
+    final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
+    final EventHandler<NamingMessage> handler = createEventHandler(codec);
+
+    this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), port, null,
+        new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000);
+
+    this.port = transport.getListeningPort();
+    this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>());
+
+    this.reefEventStateManager.registerServiceInfo(
+        AvroReefServiceInfo.newBuilder()
+            .setServiceName("NameServer")
+            .setServiceInfo(getNameServerId())
+            .build());
+    LOG.log(Level.FINE, "NameServer starting, listening at port {0}", this.port);
+  }
+
+  private EventHandler<NamingMessage> createEventHandler(final Codec<NamingMessage> codec) {
+
+    final Map<Class<? extends NamingMessage>, EventHandler<? extends NamingMessage>>
+        clazzToHandlerMap = new HashMap<>();
+
+    clazzToHandlerMap.put(NamingLookupRequest.class, new NamingLookupRequestHandler(this, codec));
+    clazzToHandlerMap.put(NamingRegisterRequest.class, new NamingRegisterRequestHandler(this, codec));
+    clazzToHandlerMap.put(NamingUnregisterRequest.class, new NamingUnregisterRequestHandler(this));
+    final EventHandler<NamingMessage> handler = new MultiEventHandler<>(clazzToHandlerMap);
+
+    return handler;
+  }
+
+  /**
+   * Gets port
+   */
+  @Override
+  public int getPort() {
+    return port;
+  }
+
+  /**
+   * Closes resources
+   */
+  @Override
+  public void close() throws Exception {
+    transport.close();
+  }
+
+  /**
+   * Registers an (identifier, address) mapping locally
+   *
+   * @param id   an identifier
+   * @param addr an Internet socket address
+   */
+  @Override
+  public void register(final Identifier id, final InetSocketAddress addr) {
+    LOG.log(Level.FINE, "id: " + id + " addr: " + addr);
+    idToAddrMap.put(id, addr);
+  }
+
+  /**
+   * Unregisters an identifier locally
+   *
+   * @param id an identifier
+   */
+  @Override
+  public void unregister(final Identifier id) {
+    LOG.log(Level.FINE, "id: " + id);
+    idToAddrMap.remove(id);
+  }
+
+  /**
+   * Finds an address for an identifier locally
+   *
+   * @param id an identifier
+   * @return an Internet socket address
+   */
+  @Override
+  public InetSocketAddress lookup(final Identifier id) {
+    LOG.log(Level.FINE, "id: {0}", id);
+    return idToAddrMap.get(id);
+  }
+
+  /**
+   * Finds addresses for identifiers locally
+   *
+   * @param identifiers an iterable of identifiers
+   * @return a list of name assignments
+   */
+  @Override
+  public List<NameAssignment> lookup(final Iterable<Identifier> identifiers) {
+    LOG.log(Level.FINE, "identifiers");
+    final List<NameAssignment> nas = new ArrayList<>();
+    for (final Identifier id : identifiers) {
+      final InetSocketAddress addr = idToAddrMap.get(id);
+      LOG.log(Level.FINEST, "id : {0} addr: {1}", new Object[]{id, addr});
+      if (addr != null) {
+        nas.add(new NameAssignmentTuple(id, addr));
+      }
+    }
+    return nas;
+  }
+
+  private String getNameServerId() {
+    return NetUtils.getLocalAddress() + ":" + getPort();
+  }
+}
+
+/**
+ * Naming server transport event handler that invokes a specific naming message handler
+ */
+class NamingServerHandler implements EventHandler<TransportEvent> {
+
+  private final Codec<NamingMessage> codec;
+  private final EventHandler<NamingMessage> handler;
+
+  NamingServerHandler(final EventHandler<NamingMessage> handler, final Codec<NamingMessage> codec) {
+    this.codec = codec;
+    this.handler = handler;
+  }
+
+  @Override
+  public void onNext(final TransportEvent value) {
+    final byte[] data = value.getData();
+    final NamingMessage message = codec.decode(data);
+    message.setLink(value.getLink());
+    handler.onNext(message);
+  }
+}
+
+/**
+ * Naming lookup request handler
+ */
+class NamingLookupRequestHandler implements EventHandler<NamingLookupRequest> {
+
+  private static final Logger LOG = Logger.getLogger(NamingLookupRequestHandler.class.getName());
+
+
+  private final NameServer server;
+  private final Codec<NamingMessage> codec;
+
+  public NamingLookupRequestHandler(final NameServer server, final Codec<NamingMessage> codec) {
+    this.server = server;
+    this.codec = codec;
+  }
+
+  @Override
+  public void onNext(final NamingLookupRequest value) {
+    final List<NameAssignment> nas = server.lookup(value.getIdentifiers());
+    final byte[] resp = codec.encode(new NamingLookupResponse(nas));
+    try {
+      value.getLink().write(resp);
+    } catch (final IOException e) {
+      //Actually, there is no way Link.write can throw and IOException
+      //after netty4 merge. This needs to cleaned up
+      LOG.throwing("NamingLookupRequestHandler", "onNext", e);
+    }
+  }
+}
+
+/**
+ * Naming register request handler
+ */
+class NamingRegisterRequestHandler implements EventHandler<NamingRegisterRequest> {
+
+  private static final Logger LOG = Logger.getLogger(NamingRegisterRequestHandler.class.getName());
+
+
+  private final NameServer server;
+  private final Codec<NamingMessage> codec;
+
+  public NamingRegisterRequestHandler(final NameServer server, final Codec<NamingMessage> codec) {
+    this.server = server;
+    this.codec = codec;
+  }
+
+  @Override
+  public void onNext(final NamingRegisterRequest value) {
+    server.register(value.getNameAssignment().getIdentifier(), value.getNameAssignment().getAddress());
+    final byte[] resp = codec.encode(new NamingRegisterResponse(value));
+    try {
+      value.getLink().write(resp);
+    } catch (final IOException e) {
+      //Actually, there is no way Link.write can throw and IOException
+      //after netty4 merge. This needs to cleaned up
+      LOG.throwing("NamingRegisterRequestHandler", "onNext", e);
+    }
+  }
+}
+
+/**
+ * Naming unregister request handler
+ */
+class NamingUnregisterRequestHandler implements EventHandler<NamingUnregisterRequest> {
+
+  private final NameServer server;
+
+  public NamingUnregisterRequestHandler(final NameServer server) {
+    this.server = server;
+  }
+
+  @Override
+  public void onNext(final NamingUnregisterRequest value) {
+    server.unregister(value.getIdentifier());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerParameters.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerParameters.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerParameters.java
new file mode 100644
index 0000000..1888886
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerParameters.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.IdentifierFactory;
+
+public class NameServerParameters {
+
+  @NamedParameter(doc = "port for the name service", default_value = "0", short_name = "nameport")
+  public class NameServerPort implements Name<Integer> {
+  }
+
+  @NamedParameter(doc = "DNS hostname running the name service")
+  public class NameServerAddr implements Name<String> {
+  }
+
+  @NamedParameter(doc = "identifier factory for the name service", default_class = StringIdentifierFactory.class)
+  public class NameServerIdentifierFactory implements Name<IdentifierFactory> {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NamingCodecFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NamingCodecFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NamingCodecFactory.java
new file mode 100644
index 0000000..8ec320a
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NamingCodecFactory.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.network.naming.serialization.*;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.impl.MultiCodec;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Factory to create naming codecs
+ */
+class NamingCodecFactory {
+
+  /**
+   * Creates a codec only for lookup
+   *
+   * @param factory an identifier factory
+   * @return a codec
+   */
+  static Codec<NamingMessage> createLookupCodec(IdentifierFactory factory) {
+    Map<Class<? extends NamingMessage>, Codec<? extends NamingMessage>> clazzToCodecMap
+        = new HashMap<Class<? extends NamingMessage>, Codec<? extends NamingMessage>>();
+    clazzToCodecMap.put(NamingLookupRequest.class, new NamingLookupRequestCodec(factory));
+    clazzToCodecMap.put(NamingLookupResponse.class, new NamingLookupResponseCodec(factory));
+    Codec<NamingMessage> codec = new MultiCodec<NamingMessage>(clazzToCodecMap);
+    return codec;
+  }
+
+  /**
+   * Creates a codec only for registration
+   *
+   * @param factory an identifier factory
+   * @return a codec
+   */
+  static Codec<NamingMessage> createRegistryCodec(IdentifierFactory factory) {
+    Map<Class<? extends NamingMessage>, Codec<? extends NamingMessage>> clazzToCodecMap
+        = new HashMap<Class<? extends NamingMessage>, Codec<? extends NamingMessage>>();
+    clazzToCodecMap.put(NamingRegisterRequest.class, new NamingRegisterRequestCodec(factory));
+    clazzToCodecMap.put(NamingRegisterResponse.class, new NamingRegisterResponseCodec(new NamingRegisterRequestCodec(factory)));
+    clazzToCodecMap.put(NamingUnregisterRequest.class, new NamingUnregisterRequestCodec(factory));
+    Codec<NamingMessage> codec = new MultiCodec<NamingMessage>(clazzToCodecMap);
+    return codec;
+  }
+
+  /**
+   * Creates a codec for both lookup and registration
+   *
+   * @param factory an identifier factory
+   * @return a codec
+   */
+  static Codec<NamingMessage> createFullCodec(IdentifierFactory factory) {
+    Map<Class<? extends NamingMessage>, Codec<? extends NamingMessage>> clazzToCodecMap
+        = new HashMap<Class<? extends NamingMessage>, Codec<? extends NamingMessage>>();
+    clazzToCodecMap.put(NamingLookupRequest.class, new NamingLookupRequestCodec(factory));
+    clazzToCodecMap.put(NamingLookupResponse.class, new NamingLookupResponseCodec(factory));
+    clazzToCodecMap.put(NamingRegisterRequest.class, new NamingRegisterRequestCodec(factory));
+    clazzToCodecMap.put(NamingRegisterResponse.class, new NamingRegisterResponseCodec(new NamingRegisterRequestCodec(factory)));
+    clazzToCodecMap.put(NamingUnregisterRequest.class, new NamingUnregisterRequestCodec(factory));
+    Codec<NamingMessage> codec = new MultiCodec<NamingMessage>(clazzToCodecMap);
+    return codec;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/NamingException.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/NamingException.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/NamingException.java
new file mode 100644
index 0000000..b5093b4
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/NamingException.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.exception;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+
+/**
+ * Naming exception
+ */
+public class NamingException extends NetworkException {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * Constructs a new resourcemanager naming exception with the specified detail message and cause
+   *
+   * @param s the detailed message
+   * @param e the cause
+   */
+  public NamingException(String s, Throwable e) {
+    super(s, e);
+  }
+
+  /**
+   * Constructs a new resourcemanager naming exception with the specified detail message
+   *
+   * @param s the detailed message
+   */
+  public NamingException(String s) {
+    super(s);
+  }
+
+  /**
+   * Constructs a new resourcemanager naming exception with the specified cause
+   *
+   * @param e the cause
+   */
+  public NamingException(Throwable e) {
+    super(e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/NamingRuntimeException.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/NamingRuntimeException.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/NamingRuntimeException.java
new file mode 100644
index 0000000..9ed0f49
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/NamingRuntimeException.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.exception;
+
+import org.apache.reef.io.network.exception.NetworkRuntimeException;
+
+/**
+ * Naming resourcemanager exception
+ */
+public class NamingRuntimeException extends NetworkRuntimeException {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * Constructs a new resourcemanager naming exception with the specified detail message and cause
+   *
+   * @param s the detailed message
+   * @param e the cause
+   */
+  public NamingRuntimeException(String s, Throwable e) {
+    super(s, e);
+  }
+
+  /**
+   * Constructs a new resourcemanager naming exception with the specified detail message
+   *
+   * @param s the detailed message
+   */
+  public NamingRuntimeException(String s) {
+    super(s);
+  }
+
+  /**
+   * Constructs a new resourcemanager naming exception with the specified cause
+   *
+   * @param e the cause
+   */
+  public NamingRuntimeException(Throwable e) {
+    super(e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/package-info.java
new file mode 100644
index 0000000..86741b8
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.exception;
\ No newline at end of file