You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2014/10/23 02:03:39 UTC

[08/51] [abbrv] [partial] Initial merge of Wake, Tang and REEF into one repository and project

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/network/NamingTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/network/NamingTest.java b/reef-io/src/test/java/com/microsoft/reef/services/network/NamingTest.java
deleted file mode 100644
index e83fd27..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/network/NamingTest.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.network;
-
-import com.microsoft.reef.io.naming.NameAssignment;
-import com.microsoft.reef.io.network.naming.*;
-import com.microsoft.reef.io.network.util.StringIdentifierFactory;
-import com.microsoft.tang.Injector;
-import com.microsoft.tang.Tang;
-import com.microsoft.tang.exceptions.InjectionException;
-import com.microsoft.wake.Identifier;
-import com.microsoft.wake.IdentifierFactory;
-import com.microsoft.wake.remote.NetUtils;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.*;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Naming server and client test
- */
-public class NamingTest {
-
-  private static final Logger LOG = Logger.getLogger(NamingTest.class.getName());
-
-  @Rule
-  public final TestName name = new TestName();
-
-  final long TTL = 30000;
-  int port;
-  final IdentifierFactory factory = new StringIdentifierFactory();
-
-  private static final int retryCount;
-  private static final int retryTimeout;
-
-  static {
-    try {
-      final Injector injector = Tang.Factory.getTang().newInjector();
-      retryCount = injector.getNamedInstance(NameLookupClient.RetryCount.class);
-      retryTimeout = injector.getNamedInstance(NameLookupClient.RetryTimeout.class);
-    } catch (final InjectionException ex) {
-      final String msg = "Exception while trying to find default values for retryCount & Timeout";
-      LOG.log(Level.SEVERE, msg, ex);
-      throw new RuntimeException(msg, ex);
-    }
-  }
-
-  /**
-   * NameServer and NameLookupClient test
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testNamingLookup() throws Exception {
-
-    LOG.log(Level.FINEST, this.name.getMethodName());
-
-    // names 
-    final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
-    idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
-    idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
-
-    // run a server
-    final NameServer server = new NameServer(0, this.factory);
-    this.port = server.getPort();
-    for (final Identifier id : idToAddrMap.keySet()) {
-      server.register(id, idToAddrMap.get(id));
-    }
-
-    // run a client
-    final NameLookupClient client = new NameLookupClient(NetUtils.getLocalAddress(), this.port,
-        10000, this.factory, retryCount, retryTimeout, new NameCache(this.TTL));
-
-    final Identifier id1 = this.factory.getNewInstance("task1");
-    final Identifier id2 = this.factory.getNewInstance("task2");
-
-    final Map<Identifier, InetSocketAddress> respMap = new HashMap<Identifier, InetSocketAddress>();
-    InetSocketAddress addr1 = client.lookup(id1);
-    respMap.put(id1, addr1);
-    InetSocketAddress addr2 = client.lookup(id2);
-    respMap.put(id2, addr2);
-
-    for (final Identifier id : respMap.keySet()) {
-      LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[] { id, respMap.get(id) });
-    }
-
-    Assert.assertTrue(isEqual(idToAddrMap, respMap));
-
-    client.close();
-    server.close();
-  }
-
-  /**
-   * Test concurrent lookups (threads share a client)
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testConcurrentNamingLookup() throws Exception {
-
-    LOG.log(Level.FINEST, this.name.getMethodName());
-
-    // test it 3 times to make failure likely
-    for (int i = 0; i < 3; i++) {
-
-      LOG.log(Level.FINEST, "test {0}", i);
-
-      // names 
-      final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
-      idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
-      idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
-      idToAddrMap.put(this.factory.getNewInstance("task3"), new InetSocketAddress(NetUtils.getLocalAddress(), 7003));
-
-      // run a server
-      final NameServer server = new NameServer(0, this.factory);
-      this.port = server.getPort();
-      for (final Identifier id : idToAddrMap.keySet()) {
-        server.register(id, idToAddrMap.get(id));
-      }
-
-      // run a client
-      final NameLookupClient client = new NameLookupClient(NetUtils.getLocalAddress(), this.port,
-          10000, this.factory, retryCount, retryTimeout, new NameCache(this.TTL));
-
-      final Identifier id1 = this.factory.getNewInstance("task1");
-      final Identifier id2 = this.factory.getNewInstance("task2");
-      final Identifier id3 = this.factory.getNewInstance("task3");
-
-      final ExecutorService e = Executors.newCachedThreadPool();
-
-      final ConcurrentMap<Identifier, InetSocketAddress> respMap = new ConcurrentHashMap<Identifier, InetSocketAddress>();
-
-      final Future<?> f1 = e.submit(new Runnable() {
-        @Override
-        public void run() {
-          InetSocketAddress addr = null;
-          try {
-            addr = client.lookup(id1);
-          } catch (final Exception e) {
-            LOG.log(Level.SEVERE, "Lookup failed", e);
-            Assert.fail(e.toString());
-          }
-          respMap.put(id1, addr);
-        }
-      });
-      final Future<?> f2 = e.submit(new Runnable() {
-        @Override
-        public void run() {
-          InetSocketAddress addr = null;
-          try {
-            addr = client.lookup(id2);
-          } catch (final Exception e) {
-            LOG.log(Level.SEVERE, "Lookup failed", e);
-            Assert.fail(e.toString());
-          }
-          respMap.put(id2, addr);
-        }
-      });
-      final Future<?> f3 = e.submit(new Runnable() {
-        @Override
-        public void run() {
-          InetSocketAddress addr = null;
-          try {
-            addr = client.lookup(id3);
-          } catch (final Exception e) {
-            LOG.log(Level.SEVERE, "Lookup failed", e);
-            Assert.fail(e.toString());
-          }
-          respMap.put(id3, addr);
-        }
-      });
-
-      f1.get();
-      f2.get();
-      f3.get();
-
-      for (final Identifier id : respMap.keySet()) {
-        LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[] { id, respMap.get(id) });
-      }
-
-      Assert.assertTrue(isEqual(idToAddrMap, respMap));
-
-      client.close();
-      server.close();
-    }
-  }
-
-  /**
-   * NameServer and NameRegistryClient test
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testNamingRegistry() throws Exception {
-
-    LOG.log(Level.FINEST, this.name.getMethodName());
-
-    final NameServer server = new NameServer(0, this.factory);
-    this.port = server.getPort();
-
-    // names to start with
-    final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
-    idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
-    idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
-
-    // registration
-    // invoke registration from the client side
-    final NameRegistryClient client = new NameRegistryClient(
-        NetUtils.getLocalAddress(), this.port, this.factory);
-    for (final Identifier id : idToAddrMap.keySet()) {
-      client.register(id, idToAddrMap.get(id));
-    }
-
-    // wait
-    final Set<Identifier> ids = idToAddrMap.keySet();
-    busyWait(server, ids.size(), ids);
-
-    // check the server side 
-    Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier, InetSocketAddress>();
-    Iterable<NameAssignment> nas = server.lookup(ids);
-
-    for (final NameAssignment na : nas) {
-      LOG.log(Level.FINEST, "Mapping: {0} -> {1}",
-          new Object[] { na.getIdentifier(), na.getAddress() });
-      serverMap.put(na.getIdentifier(), na.getAddress());
-    }
-
-    Assert.assertTrue(isEqual(idToAddrMap, serverMap));
-
-    // un-registration
-    for (final Identifier id : idToAddrMap.keySet()) {
-      client.unregister(id);
-    }
-
-    // wait
-    busyWait(server, 0, ids);
-
-    serverMap = new HashMap<Identifier, InetSocketAddress>();
-    nas = server.lookup(ids);
-    for (final NameAssignment na : nas)
-      serverMap.put(na.getIdentifier(), na.getAddress());
-
-    Assert.assertEquals(0, serverMap.size());
-
-    client.close();
-    server.close();
-  }
-
-  /**
-   * NameServer and NameClient test
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testNameClient() throws Exception {
-
-    LOG.log(Level.FINEST, this.name.getMethodName());
-
-    final NameServer server = new NameServer(0, this.factory);
-    this.port = server.getPort();
-
-    final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
-    idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
-    idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
-
-    // registration
-    // invoke registration from the client side
-    final NameClient client = new NameClient(NetUtils.getLocalAddress(), this.port,
-        this.factory, retryCount, retryTimeout, new NameCache(this.TTL));
-    for (final Identifier id : idToAddrMap.keySet()) {
-      client.register(id, idToAddrMap.get(id));
-    }
-
-    // wait
-    final Set<Identifier> ids = idToAddrMap.keySet();
-    busyWait(server, ids.size(), ids);
-
-    // lookup
-    final Identifier id1 = this.factory.getNewInstance("task1");
-    final Identifier id2 = this.factory.getNewInstance("task2");
-
-    final Map<Identifier, InetSocketAddress> respMap = new HashMap<Identifier, InetSocketAddress>();
-    InetSocketAddress addr1 = client.lookup(id1);
-    respMap.put(id1, addr1);
-    InetSocketAddress addr2 = client.lookup(id2);
-    respMap.put(id2, addr2);
-
-    for (final Identifier id : respMap.keySet()) {
-      LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[] { id, respMap.get(id) });
-    }
-
-    Assert.assertTrue(isEqual(idToAddrMap, respMap));
-
-    // un-registration
-    for (final Identifier id : idToAddrMap.keySet()) {
-      client.unregister(id);
-    }
-
-    // wait
-    busyWait(server, 0, ids);
-
-    final Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier, InetSocketAddress>();
-    addr1 = server.lookup(id1);
-    if (addr1 != null) serverMap.put(id1, addr1);
-    addr2 = server.lookup(id1);
-    if (addr2 != null) serverMap.put(id2, addr2);
-
-    Assert.assertEquals(0, serverMap.size());
-
-    client.close();
-    server.close();
-  }
-
-  private boolean isEqual(final Map<Identifier, InetSocketAddress> map1,
-                          final Map<Identifier, InetSocketAddress> map2) {
-
-    if (map1.size() != map2.size()) {
-      return false;
-    }
-
-    for (final Identifier id : map1.keySet()) {
-      final InetSocketAddress addr1 = map1.get(id);
-      final InetSocketAddress addr2 = map2.get(id);
-      if (!addr1.equals(addr2)) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  private void busyWait(final NameServer server, final int expected, final Set<Identifier> ids) {
-    int count = 0;
-    for (;;) {
-      final Iterable<NameAssignment> nas = server.lookup(ids);
-      for (final @SuppressWarnings("unused") NameAssignment na : nas) {
-        ++count;
-      }
-      if (count == expected) {
-        break;
-      }
-      count = 0;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/network/NetworkServiceTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/network/NetworkServiceTest.java b/reef-io/src/test/java/com/microsoft/reef/services/network/NetworkServiceTest.java
deleted file mode 100644
index c1a156a..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/network/NetworkServiceTest.java
+++ /dev/null
@@ -1,489 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.network;
-
-import com.microsoft.reef.exception.evaluator.NetworkException;
-import com.microsoft.reef.io.network.Connection;
-import com.microsoft.reef.io.network.Message;
-import com.microsoft.reef.io.network.impl.MessagingTransportFactory;
-import com.microsoft.reef.io.network.impl.NetworkService;
-import com.microsoft.reef.io.network.naming.NameServer;
-import com.microsoft.reef.io.network.util.StringIdentifierFactory;
-import com.microsoft.reef.services.network.util.Monitor;
-import com.microsoft.reef.services.network.util.StringCodec;
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.Identifier;
-import com.microsoft.wake.IdentifierFactory;
-import com.microsoft.wake.remote.NetUtils;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Network service test
- */
-public class NetworkServiceTest {
-  private static final Logger LOG = Logger.getLogger(NetworkServiceTest.class.getName());
-
-  @Rule
-  public TestName name = new TestName();
-
-  /**
-   * NetworkService messaging test
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testMessagingNetworkService() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-
-    IdentifierFactory factory = new StringIdentifierFactory();
-    String nameServerAddr = NetUtils.getLocalAddress();
-
-    NameServer server = new NameServer(0, factory);
-    int nameServerPort = server.getPort();
-
-    final int numMessages = 10;
-    final Monitor monitor = new Monitor();
-
-    LOG.log(Level.FINEST, "=== Test network service receiver start");
-    // network service
-    final String name2 = "task2";
-    NetworkService<String> ns2 = new NetworkService<String>(
-        factory, 0, nameServerAddr, nameServerPort,
-        new StringCodec(), new MessagingTransportFactory(),
-        new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler());
-    ns2.registerId(factory.getNewInstance(name2));
-    final int port2 = ns2.getTransport().getListeningPort();
-    server.register(factory.getNewInstance("task2"), new InetSocketAddress(nameServerAddr, port2));
-
-    LOG.log(Level.FINEST, "=== Test network service sender start");
-    final String name1 = "task1";
-    final NetworkService<String> ns1 = new NetworkService<String>(factory, 0, nameServerAddr, nameServerPort,
-        new StringCodec(), new MessagingTransportFactory(),
-        new MessageHandler<String>(name1, null, 0), new ExceptionHandler());
-    ns1.registerId(factory.getNewInstance(name1));
-    final int port1 = ns1.getTransport().getListeningPort();
-    server.register(factory.getNewInstance("task1"), new InetSocketAddress(nameServerAddr, port1));
-
-    final Identifier destId = factory.getNewInstance(name2);
-    final Connection<String> conn = ns1.newConnection(destId);
-    try {
-      conn.open();
-      for (int count = 0; count < numMessages; ++count) {
-        conn.write("hello! " + count);
-      }
-      monitor.mwait();
-
-    } catch (NetworkException e) {
-      e.printStackTrace();
-    }
-    conn.close();
-
-    ns1.close();
-    ns2.close();
-
-    server.close();
-  }
-
-  /**
-   * NetworkService messaging rate benchmark
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testMessagingNetworkServiceRate() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-
-    IdentifierFactory factory = new StringIdentifierFactory();
-    String nameServerAddr = NetUtils.getLocalAddress();
-
-    NameServer server = new NameServer(0, factory);
-    int nameServerPort = server.getPort();
-
-    final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
-
-    for (int size : messageSizes) {
-      final int numMessages = 300000 / (Math.max(1, size / 512));
-      final Monitor monitor = new Monitor();
-
-      LOG.log(Level.FINEST, "=== Test network service receiver start");
-      // network service
-      final String name2 = "task2";
-      NetworkService<String> ns2 = new NetworkService<String>(
-          factory, 0, nameServerAddr, nameServerPort,
-          new StringCodec(), new MessagingTransportFactory(),
-          new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler());
-      ns2.registerId(factory.getNewInstance(name2));
-      final int port2 = ns2.getTransport().getListeningPort();
-      server.register(factory.getNewInstance("task2"), new InetSocketAddress(nameServerAddr, port2));
-
-      LOG.log(Level.FINEST, "=== Test network service sender start");
-      final String name1 = "task1";
-      NetworkService<String> ns1 = new NetworkService<String>(
-          factory, 0, nameServerAddr, nameServerPort,
-          new StringCodec(), new MessagingTransportFactory(),
-          new MessageHandler<String>(name1, null, 0), new ExceptionHandler());
-      ns1.registerId(factory.getNewInstance(name1));
-      final int port1 = ns1.getTransport().getListeningPort();
-      server.register(factory.getNewInstance("task1"), new InetSocketAddress(nameServerAddr, port1));
-
-      Identifier destId = factory.getNewInstance(name2);
-      Connection<String> conn = ns1.newConnection(destId);
-
-      // build the message
-      StringBuilder msb = new StringBuilder();
-      for (int i = 0; i < size; i++) {
-        msb.append("1");
-      }
-      String message = msb.toString();
-
-      long start = System.currentTimeMillis();
-      try {
-        for (int i = 0; i < numMessages; i++) {
-          conn.open();
-          conn.write(message);
-        }
-        monitor.mwait();
-      } catch (NetworkException e) {
-        e.printStackTrace();
-      }
-      long end = System.currentTimeMillis();
-      double runtime = ((double) end - start) / 1000;
-      LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + numMessages / runtime + " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / runtime);// x2 for unicode chars
-      conn.close();
-
-      ns1.close();
-      ns2.close();
-    }
-
-    server.close();
-  }
-
-  /**
-   * NetworkService messaging rate benchmark
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testMessagingNetworkServiceRateDisjoint() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-
-    final IdentifierFactory factory = new StringIdentifierFactory();
-    final String nameServerAddr = NetUtils.getLocalAddress();
-
-    final NameServer server = new NameServer(0, factory);
-    final int nameServerPort = server.getPort();
-
-    BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>();
-
-    int numThreads = 4;
-    final int size = 2000;
-    final int numMessages = 300000 / (Math.max(1, size / 512));
-    final int totalNumMessages = numMessages * numThreads;
-
-    ExecutorService e = Executors.newCachedThreadPool();
-    for (int t = 0; t < numThreads; t++) {
-      final int tt = t;
-
-      e.submit(new Runnable() {
-        public void run() {
-          try {
-            Monitor monitor = new Monitor();
-
-            LOG.log(Level.FINEST, "=== Test network service receiver start");
-            // network service
-            final String name2 = "task2-" + tt;
-            NetworkService<String> ns2 = new NetworkService<String>(
-                factory, 0, nameServerAddr, nameServerPort,
-                new StringCodec(), new MessagingTransportFactory(),
-                new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler());
-            ns2.registerId(factory.getNewInstance(name2));
-            final int port2 = ns2.getTransport().getListeningPort();
-            server.register(factory.getNewInstance(name2), new InetSocketAddress(nameServerAddr, port2));
-
-            LOG.log(Level.FINEST, "=== Test network service sender start");
-            final String name1 = "task1-" + tt;
-            NetworkService<String> ns1 = new NetworkService<String>(
-                factory, 0, nameServerAddr, nameServerPort,
-                new StringCodec(), new MessagingTransportFactory(),
-                new MessageHandler<String>(name1, null, 0), new ExceptionHandler());
-            ns1.registerId(factory.getNewInstance(name1));
-            final int port1 = ns1.getTransport().getListeningPort();
-            server.register(factory.getNewInstance(name1), new InetSocketAddress(nameServerAddr, port1));
-
-            Identifier destId = factory.getNewInstance(name2);
-            Connection<String> conn = ns1.newConnection(destId);
-
-            // build the message
-            StringBuilder msb = new StringBuilder();
-            for (int i = 0; i < size; i++) {
-              msb.append("1");
-            }
-            String message = msb.toString();
-
-
-            try {
-              for (int i = 0; i < numMessages; i++) {
-                conn.open();
-                conn.write(message);
-              }
-              monitor.mwait();
-            } catch (NetworkException e) {
-              e.printStackTrace();
-            }
-            conn.close();
-
-            ns1.close();
-            ns2.close();
-          } catch (Exception e) {
-            e.printStackTrace();
-
-          }
-        }
-      });
-    }
-
-    // start and time
-    long start = System.currentTimeMillis();
-    Object ignore = new Object();
-    for (int i = 0; i < numThreads; i++) barrier.add(ignore);
-    e.shutdown();
-    e.awaitTermination(100, TimeUnit.SECONDS);
-    long end = System.currentTimeMillis();
-
-    double runtime = ((double) end - start) / 1000;
-    LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
-
-    server.close();
-  }
-
-  @Test
-  public void testMultithreadedSharedConnMessagingNetworkServiceRate() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-
-    IdentifierFactory factory = new StringIdentifierFactory();
-    String nameServerAddr = NetUtils.getLocalAddress();
-
-    NameServer server = new NameServer(0, factory);
-    int nameServerPort = server.getPort();
-
-    final int[] messageSizes = {2000};// {1,16,32,64,512,64*1024,1024*1024};
-
-    for (int size : messageSizes) {
-      final int numMessages = 300000 / (Math.max(1, size / 512));
-      int numThreads = 2;
-      int totalNumMessages = numMessages * numThreads;
-      final Monitor monitor = new Monitor();
-
-      LOG.log(Level.FINEST, "=== Test network service receiver start");
-      // network service
-      final String name2 = "task2";
-      NetworkService<String> ns2 = new NetworkService<String>(
-          factory, 0, nameServerAddr, nameServerPort,
-          new StringCodec(), new MessagingTransportFactory(),
-          new MessageHandler<String>(name2, monitor, totalNumMessages), new ExceptionHandler());
-      ns2.registerId(factory.getNewInstance(name2));
-      final int port2 = ns2.getTransport().getListeningPort();
-      server.register(factory.getNewInstance("task2"), new InetSocketAddress(nameServerAddr, port2));
-
-      LOG.log(Level.FINEST, "=== Test network service sender start");
-      final String name1 = "task1";
-      NetworkService<String> ns1 = new NetworkService<String>(
-          factory, 0, nameServerAddr, nameServerPort,
-          new StringCodec(), new MessagingTransportFactory(),
-          new MessageHandler<String>(name1, null, 0), new ExceptionHandler());
-      ns1.registerId(factory.getNewInstance(name1));
-      final int port1 = ns1.getTransport().getListeningPort();
-      server.register(factory.getNewInstance("task1"), new InetSocketAddress(nameServerAddr, port1));
-
-      Identifier destId = factory.getNewInstance(name2);
-      final Connection<String> conn = ns1.newConnection(destId);
-      conn.open();
-
-      // build the message
-      StringBuilder msb = new StringBuilder();
-      for (int i = 0; i < size; i++) {
-        msb.append("1");
-      }
-      final String message = msb.toString();
-
-      ExecutorService e = Executors.newCachedThreadPool();
-
-      long start = System.currentTimeMillis();
-      for (int i = 0; i < numThreads; i++) {
-        e.submit(new Runnable() {
-
-          @Override
-          public void run() {
-            try {
-              for (int i = 0; i < numMessages; i++) {
-                conn.write(message);
-              }
-            } catch (NetworkException e) {
-              e.printStackTrace();
-            }
-          }
-        });
-      }
-
-
-      e.shutdown();
-      e.awaitTermination(30, TimeUnit.SECONDS);
-      monitor.mwait();
-
-      long end = System.currentTimeMillis();
-      double runtime = ((double) end - start) / 1000;
-
-      LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
-      conn.close();
-
-      ns1.close();
-      ns2.close();
-    }
-
-    server.close();
-  }
-
-  /**
-   * NetworkService messaging rate benchmark
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testMessagingNetworkServiceBatchingRate() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-
-    IdentifierFactory factory = new StringIdentifierFactory();
-    String nameServerAddr = NetUtils.getLocalAddress();
-
-    NameServer server = new NameServer(0, factory);
-    int nameServerPort = server.getPort();
-
-    final int batchSize = 1024 * 1024;
-    final int[] messageSizes = {32, 64, 512};
-
-    for (int size : messageSizes) {
-      final int numMessages = 300 / (Math.max(1, size / 512));
-      final Monitor monitor = new Monitor();
-
-      LOG.log(Level.FINEST, "=== Test network service receiver start");
-      // network service
-      final String name2 = "task2";
-      NetworkService<String> ns2 = new NetworkService<String>(
-          factory, 0, nameServerAddr, nameServerPort,
-          new StringCodec(), new MessagingTransportFactory(),
-          new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler());
-      ns2.registerId(factory.getNewInstance(name2));
-      final int port2 = ns2.getTransport().getListeningPort();
-      server.register(factory.getNewInstance("task2"), new InetSocketAddress(nameServerAddr, port2));
-
-      LOG.log(Level.FINEST, "=== Test network service sender start");
-      final String name1 = "task1";
-      NetworkService<String> ns1 = new NetworkService<String>(
-          factory, 0, nameServerAddr, nameServerPort,
-          new StringCodec(), new MessagingTransportFactory(),
-          new MessageHandler<String>(name1, null, 0), new ExceptionHandler());
-      ns1.registerId(factory.getNewInstance(name1));
-      final int port1 = ns1.getTransport().getListeningPort();
-      server.register(factory.getNewInstance("task1"), new InetSocketAddress(nameServerAddr, port1));
-
-      Identifier destId = factory.getNewInstance(name2);
-      Connection<String> conn = ns1.newConnection(destId);
-
-      // build the message
-      StringBuilder msb = new StringBuilder();
-      for (int i = 0; i < size; i++) {
-        msb.append("1");
-      }
-      String message = msb.toString();
-
-      long start = System.currentTimeMillis();
-      try {
-        for (int i = 0; i < numMessages; i++) {
-          StringBuilder sb = new StringBuilder();
-          for (int j = 0; j < batchSize / size; j++) {
-            sb.append(message);
-          }
-          conn.open();
-          conn.write(sb.toString());
-        }
-        monitor.mwait();
-      } catch (NetworkException e) {
-        e.printStackTrace();
-      }
-      long end = System.currentTimeMillis();
-      double runtime = ((double) end - start) / 1000;
-      long numAppMessages = numMessages * batchSize / size;
-      LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + numAppMessages / runtime + " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / runtime);// x2 for unicode chars
-      conn.close();
-
-      ns1.close();
-      ns2.close();
-    }
-
-    server.close();
-  }
-
-  /**
-   * Test message handler
-   *
-   * @param <T> type
-   */
-  class MessageHandler<T> implements EventHandler<Message<T>> {
-
-    private final String name;
-    private final int expected;
-    private final Monitor monitor;
-    private AtomicInteger count = new AtomicInteger(0);
-
-    MessageHandler(String name, Monitor monitor, int expected) {
-      this.name = name;
-      this.monitor = monitor;
-      this.expected = expected;
-    }
-
-    @Override
-    public void onNext(Message<T> value) {
-      count.incrementAndGet();
-
-      //System.out.print(name + " received " + value.getData() + " from " + value.getSrcId() + " to " + value.getDestId());
-      for (T obj : value.getData()) {
-        // System.out.print(" data: " + obj);
-      }
-      //LOG.log(Level.FINEST, );
-      if (count.get() == expected) {
-        monitor.mnotify();
-      }
-    }
-  }
-
-  /**
-   * Test exception handler
-   */
-  class ExceptionHandler implements EventHandler<Exception> {
-    @Override
-    public void onNext(Exception error) {
-      System.err.println(error);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/network/TestEvent.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/network/TestEvent.java b/reef-io/src/test/java/com/microsoft/reef/services/network/TestEvent.java
deleted file mode 100644
index 867a650..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/network/TestEvent.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.network;
-
-import java.io.Serializable;
-
-/**
- * Event for testing
- */
-public class TestEvent implements Serializable {
-
-  private static final long serialVersionUID = 1L;
-  private String message;
-
-  public TestEvent(String message) {
-    this.message = message;
-  }
-  
-  public String getMessage() {
-    return message;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/network/util/LoggingUtils.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/network/util/LoggingUtils.java b/reef-io/src/test/java/com/microsoft/reef/services/network/util/LoggingUtils.java
deleted file mode 100644
index 43c661d..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/network/util/LoggingUtils.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.network.util;
-
-import java.util.logging.ConsoleHandler;
-import java.util.logging.Handler;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public class LoggingUtils {
-  public static void setLoggingLevel(Level level) {
-    Handler[] handlers = Logger.getLogger("").getHandlers();
-    ConsoleHandler ch = null;
-    for (Handler h : handlers) {
-      if (h instanceof ConsoleHandler) {
-        ch = (ConsoleHandler)h;
-        break;
-      }
-    }
-    if (ch == null) {
-      ch = new ConsoleHandler();
-      Logger.getLogger("").addHandler(ch);
-    }
-    ch.setLevel(level);
-    Logger.getLogger("").setLevel(level);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/network/util/Monitor.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/network/util/Monitor.java b/reef-io/src/test/java/com/microsoft/reef/services/network/util/Monitor.java
deleted file mode 100644
index 1d7c901..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/network/util/Monitor.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.network.util;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class Monitor {
-  private AtomicBoolean finished = new AtomicBoolean(false);
-    
-  public void mwait() throws InterruptedException {
-    synchronized(this) {
-      while(!finished.get()) 
-        this.wait();
-    }
-  }
-    
-  public void mnotify() {
-    synchronized(this) {
-      finished.compareAndSet(false, true);
-      this.notifyAll();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/network/util/StringCodec.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/network/util/StringCodec.java b/reef-io/src/test/java/com/microsoft/reef/services/network/util/StringCodec.java
deleted file mode 100644
index f22defb..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/network/util/StringCodec.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.network.util;
-
-import com.microsoft.wake.remote.Codec;
-
-
-public class StringCodec implements Codec<String> {
-  @Override
-  public byte[] encode(String obj) {
-    return obj.getBytes();
-  }
-
-  @Override
-  public String decode(byte[] buf) {
-    return new String(buf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/network/util/TimeoutHandler.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/network/util/TimeoutHandler.java b/reef-io/src/test/java/com/microsoft/reef/services/network/util/TimeoutHandler.java
deleted file mode 100644
index 186797c..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/network/util/TimeoutHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.network.util;
-
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.impl.PeriodicEvent;
-
-public class TimeoutHandler implements EventHandler<PeriodicEvent> {
-  
-  private final Monitor monitor;
-  
-  public TimeoutHandler(Monitor monitor) {
-    this.monitor = monitor;
-  }
-  
-  @Override
-  public void onNext(PeriodicEvent event) {
-    monitor.mnotify();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/network/util/package-info.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/network/util/package-info.java b/reef-io/src/test/java/com/microsoft/reef/services/network/util/package-info.java
deleted file mode 100644
index d806bb4..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/network/util/package-info.java
+++ /dev/null
@@ -1,16 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.network.util;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/storage/ExternalMapTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/storage/ExternalMapTest.java b/reef-io/src/test/java/com/microsoft/reef/services/storage/ExternalMapTest.java
deleted file mode 100644
index 04b41f6..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/storage/ExternalMapTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.storage;
-
-import com.microsoft.reef.io.ExternalMap;
-import com.microsoft.reef.io.serialization.Codec;
-import com.microsoft.reef.io.storage.ram.CodecRamMap;
-import com.microsoft.reef.io.storage.ram.RamMap;
-import com.microsoft.reef.io.storage.ram.RamStorageService;
-import com.microsoft.reef.io.storage.util.IntegerCodec;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.*;
-
-
-public class ExternalMapTest {
-  @Test
-  public void testCodecRamMap() {
-    RamStorageService ramStore = new RamStorageService();
-    Codec<Integer> c = new IntegerCodec();
-    ExternalMap<Integer> m = new CodecRamMap<>(ramStore, c);
-    genericTest(m);
-  }
-  @Test
-  public void testRamMap() {
-    RamStorageService ramStore = new RamStorageService();
-    ExternalMap<Integer> m = new RamMap<>(ramStore);
-    genericTest(m);
-  }
-
-  
-  void genericTest(ExternalMap<Integer> m) {
-    m.put("foo", 42);
-    Map<String, Integer> smallMap = new HashMap<>();
-    smallMap.put("bar", 43);
-    smallMap.put("baz", 44);
-    
-    m.putAll(smallMap);
-    
-    Assert.assertEquals(44, (int)m.get("baz"));
-    Assert.assertEquals(43, (int)m.get("bar"));
-    Assert.assertEquals(42, (int)m.get("foo"));
-    Assert.assertNull(m.get("quuz"));
-    
-    Assert.assertTrue(m.containsKey("bar"));
-    Assert.assertFalse(m.containsKey("quuz"));
-
-    Set<String> barBaz = new HashSet<>();
-    barBaz.add("bar");
-    barBaz.add("baz");
-    barBaz.add("quuz");
-
-    Iterable<Map.Entry<CharSequence,Integer>> it = m.getAll(barBaz);
-
-    Map<CharSequence, Integer> found = new TreeMap<>();
-    
-    for(Map.Entry<CharSequence, Integer> e: it) {
-      found.put(e.getKey(), e.getValue());
-    }
-    Iterator<CharSequence> it2 = found.keySet().iterator();
-    Assert.assertTrue(it2.hasNext());
-    CharSequence s = it2.next();
-    Assert.assertEquals(s, "bar");
-    Assert.assertEquals((int)found.get(s), 43);
-    Assert.assertTrue(it2.hasNext());
-    s = it2.next();
-    Assert.assertEquals(s, "baz");
-    Assert.assertEquals((int)found.get(s), 44);
-    Assert.assertFalse(it2.hasNext());
-    
-    Assert.assertEquals(44, (int)m.remove("baz"));
-    Assert.assertFalse(m.containsKey("baz"));
-    
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/storage/FramingTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/storage/FramingTest.java b/reef-io/src/test/java/com/microsoft/reef/services/storage/FramingTest.java
deleted file mode 100644
index 5713732..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/storage/FramingTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.storage;
-
-import com.microsoft.reef.exception.evaluator.ServiceException;
-import com.microsoft.reef.io.Accumulator;
-import com.microsoft.reef.io.storage.FramingInputStream;
-import com.microsoft.reef.io.storage.FramingOutputStream;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-
-public class FramingTest {
-
-  @Test
-  public void frameRoundTripTest() throws IOException, ServiceException {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    ByteArrayOutputStream baos2 = new ByteArrayOutputStream();
-    FramingOutputStream o = new FramingOutputStream(baos);
-    FramingOutputStream o2 = new FramingOutputStream(baos2);
-    Accumulator<byte[]> a = o2.accumulator();
-    int offset = 0;
-    for(int i = 0; i < 256; i++ ) {
-      byte[] b = new byte[i];
-      Arrays.fill(b, (byte)i);
-      o.write(b);
-      if(i == 255) {
-        o.close();
-      } else {
-        o.nextFrame();
-      }
-      offset += (4 + i);
-      Assert.assertEquals(offset, o.getCurrentOffset());
-      a.add(b);
-      Assert.assertEquals(offset, o2.getCurrentOffset());
-    }
-    a.close();
-    o2.close();
-    byte[] b1 = baos.toByteArray();
-    byte[] b2 = baos2.toByteArray();
-    Assert.assertArrayEquals(b1,  b2);
-    FramingInputStream inA1 = new FramingInputStream(new ByteArrayInputStream(b1));
-    FramingInputStream inA2 = new FramingInputStream(new ByteArrayInputStream(b2));
-    for(int i = 0; i <= 256; i++ ) {
-      byte[] b = new byte[i];
-      Arrays.fill(b, (byte)i);
-      byte[] f = inA1.readFrame();
-      byte[] g = inA2.readFrame();
-      if(i == 256) {
-        Assert.assertNull(f);
-        Assert.assertNull(g);
-      } else {
-        Assert.assertArrayEquals(b, f);
-        Assert.assertArrayEquals(b, g);
-      }
-    }
-    inA2.close();
-    inA1.close();
-
-    FramingInputStream inB1 = new FramingInputStream(new ByteArrayInputStream(b1));
-    int i = 0;
-    for(byte[] bin : inB1) {
-      byte[] b = new byte[i];
-      Arrays.fill(b, (byte)i);
-      Assert.assertArrayEquals(b, bin);
-      i++;
-    }
-    Assert.assertEquals(256, i);
-    inB1.close();
-
-    FramingInputStream inB2 = new FramingInputStream(new ByteArrayInputStream(b2));
-    i = 0;
-    for(byte[] bin : inB2) {
-      byte[] b = new byte[i];
-      Arrays.fill(b, (byte)i);
-      Assert.assertArrayEquals(b, bin);
-      i++;
-    }
-    Assert.assertEquals(256, i);
-    inB2.close();
-    Assert.assertArrayEquals(b1, b2);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/storage/MergingIteratorTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/storage/MergingIteratorTest.java b/reef-io/src/test/java/com/microsoft/reef/services/storage/MergingIteratorTest.java
deleted file mode 100644
index 0c920c7..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/storage/MergingIteratorTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.storage;
-
-import com.microsoft.reef.io.storage.MergingIterator;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.Iterator;
-
-public class MergingIteratorTest {
-
-  @Test
-  public void testMergingIterator() {
-    Comparator<Integer> cmp = new Comparator<Integer>() {
-
-      @Override
-      public int compare(Integer o1, Integer o2) {
-        return Integer.compare(o1, o2);
-      }
-    };
-    @SuppressWarnings("unchecked")
-    Iterator<Integer>[] its = new Iterator[]{
-        Arrays.asList(new Integer[]{1, 4, 7, 10}).iterator(),
-        Arrays.asList(new Integer[]{2, 5, 8, 11}).iterator(),
-        Arrays.asList(new Integer[]{3, 6, 9, 12}).iterator()
-    };
-    MergingIterator<Integer> merge = new MergingIterator<Integer>(cmp, its);
-    int i = 1;
-    while (merge.hasNext()) {
-      Assert.assertEquals(i, (int) merge.next());
-      i++;
-    }
-    Assert.assertEquals(13, i);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/storage/SortingSpoolTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/storage/SortingSpoolTest.java b/reef-io/src/test/java/com/microsoft/reef/services/storage/SortingSpoolTest.java
deleted file mode 100644
index 759a0f3..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/storage/SortingSpoolTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.storage;
-
-import com.microsoft.reef.exception.evaluator.ServiceException;
-import com.microsoft.reef.io.Accumulator;
-import com.microsoft.reef.io.Spool;
-import com.microsoft.reef.io.storage.ram.SortingRamSpool;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.*;
-
-public class SortingSpoolTest {
-
-  @Test
-  public void testRamSpool() throws ServiceException {
-    genericTest(new SortingRamSpool<Integer>(), new Comparator<Integer>() {
-
-      @Override
-      public int compare(Integer o1, Integer o2) {
-        return Integer.compare(o1, o2);
-      }
-
-    });
-  }
-
-  @Test
-  public void testRamSpoolComparator() throws ServiceException {
-    Comparator<Integer> backwards = new Comparator<Integer>() {
-
-      @Override
-      public int compare(Integer o1, Integer o2) {
-        return -1 * o1.compareTo(o2);
-      }
-
-    };
-    genericTest(new SortingRamSpool<Integer>(backwards), backwards);
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testRamSpoolAddAfterClose() throws ServiceException {
-    Spool<Integer> s = new SortingRamSpool<>();
-    genericAddAfterCloseTest(s);
-  }
-  @Test(expected = UnsupportedOperationException.class)
-  public void testRamSpoolCantRemove() throws ServiceException {
-    Spool<Integer> s = new SortingRamSpool<>();
-    genericCantRemove(s);
-  }
-  @Test(expected = IllegalStateException.class)
-  public void testIteratorBeforeClose() throws ServiceException {
-    Spool<Integer> s = new SortingRamSpool<>();
-    genericIteratorBeforeClose(s);
-  }
-
-  void genericTest(Spool<Integer> s, Comparator<Integer> comparator)
-      throws ServiceException {
-    List<Integer> l = new ArrayList<Integer>();
-    Random r = new Random(42);
-    while (l.size() < 100) {
-      l.add(r.nextInt(75));
-    }
-    Accumulator<Integer> a = s.accumulator();
-    for (int i = 0; i < 100; i++) {
-      a.add(l.get(i));
-    }
-    a.close();
-    List<Integer> m = new ArrayList<Integer>();
-    for (int i : s) {
-      m.add(i);
-    }
-    Integer[] sorted = l.toArray(new Integer[0]);
-    Arrays.sort(sorted, 0, sorted.length, comparator);
-    Integer[] shouldBeSorted = m.toArray(new Integer[0]);
-    Assert.assertArrayEquals(sorted, shouldBeSorted);
-  }
-
-  void genericAddAfterCloseTest(Spool<?> s) throws ServiceException {
-    Accumulator<?> a = s.accumulator();
-    a.close();
-    a.add(null);
-  }
-
-  void genericCantRemove(Spool<Integer> s) throws ServiceException {
-    Accumulator<Integer> a = s.accumulator();
-    a.add(10);
-    a.close();
-    Iterator<?> it = s.iterator();
-    it.remove();
-  }
-
-  void genericIteratorBeforeClose(Spool<Integer> s) throws ServiceException {
-    Accumulator<Integer> a = s.accumulator();
-    a.add(10);
-    s.iterator();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/storage/SpoolFileTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/storage/SpoolFileTest.java b/reef-io/src/test/java/com/microsoft/reef/services/storage/SpoolFileTest.java
deleted file mode 100644
index 057bcf0..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/storage/SpoolFileTest.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.storage;
-
-import com.microsoft.reef.exception.evaluator.ServiceException;
-import com.microsoft.reef.io.Accumulable;
-import com.microsoft.reef.io.Accumulator;
-import com.microsoft.reef.io.Spool;
-import com.microsoft.reef.io.serialization.Codec;
-import com.microsoft.reef.io.serialization.Deserializer;
-import com.microsoft.reef.io.serialization.Serializer;
-import com.microsoft.reef.io.storage.local.CodecFileAccumulable;
-import com.microsoft.reef.io.storage.local.CodecFileIterable;
-import com.microsoft.reef.io.storage.local.LocalStorageService;
-import com.microsoft.reef.io.storage.local.SerializerFileSpool;
-import com.microsoft.reef.io.storage.ram.RamSpool;
-import com.microsoft.reef.io.storage.ram.RamStorageService;
-import com.microsoft.reef.io.storage.util.IntegerCodec;
-import com.microsoft.tang.ConfigurationBuilder;
-import com.microsoft.tang.Tang;
-import com.microsoft.tang.exceptions.BindException;
-import com.microsoft.tang.exceptions.InjectionException;
-import com.microsoft.tang.formats.AvroConfigurationSerializer;
-import com.microsoft.tang.formats.ConfigurationModule;
-import com.microsoft.tang.formats.ConfigurationModuleBuilder;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Iterator;
-
-public class SpoolFileTest {
-  public static final class RamConf extends ConfigurationModuleBuilder {
-    public static final ConfigurationModule CONF = new RamConf()
-        .bindImplementation(RamStorageService.class, RamStorageService.class)
-        .bindImplementation(Spool.class, RamSpool.class)
-        .build();
-  }
-
-  @Test
-  public void testRam() throws BindException, InjectionException, ServiceException, IOException {
-    final Tang t = Tang.Factory.getTang();
-    final ConfigurationBuilder configurationBuilderOne = t.newConfigurationBuilder(RamConf.CONF.build());
-
-    final AvroConfigurationSerializer serializer = new AvroConfigurationSerializer();
-    final String serializedConfiguration = serializer.toString(configurationBuilderOne.build());
-    final ConfigurationBuilder configurationBuilderTwo = t.newConfigurationBuilder(serializer.fromString(serializedConfiguration));
-
-    @SuppressWarnings("unchecked")
-    final Spool<Integer> f = (Spool<Integer>) t.newInjector(configurationBuilderTwo.build()).getInstance(
-        Spool.class);
-    test(f);
-  }
-
-  private final Serializer<Integer, OutputStream> serializer = new Serializer<Integer, OutputStream>() {
-    @Override
-    public Accumulable<Integer> create(final OutputStream out) {
-      return new Accumulable<Integer>() {
-
-        @Override
-        public Accumulator<Integer> accumulator() {
-          return new Accumulator<Integer>() {
-
-            @Override
-            public void add(Integer datum) {
-              try {
-                int d = datum;
-                out.write(new byte[]{(byte) (d >>> 24), (byte) (d >>> 16),
-                    (byte) (d >>> 8), (byte) d});
-              } catch (IOException e) {
-                throw new IllegalStateException(e);
-              }
-            }
-
-            @Override
-            public void close() {
-              try {
-                out.flush();
-              } catch (IOException e) {
-                throw new IllegalStateException(e);
-              }
-            }
-          };
-        }
-      };
-    }
-  };
-
-  private final Deserializer<Integer, InputStream> deserializer = new Deserializer<Integer, InputStream>() {
-    @Override
-    public Iterable<Integer> create(final InputStream in) {
-      return new Iterable<Integer>() {
-        @Override
-        public Iterator<Integer> iterator() {
-          Iterator<Integer> it = new Iterator<Integer>() {
-            final byte[] inb = new byte[4];
-            Integer nextInt;
-
-            @Override
-            public boolean hasNext() {
-              return nextInt != null;
-            }
-
-            private void prime() {
-              int read;
-              try {
-                read = in.read(inb);
-              } catch (IOException e) {
-                throw new IllegalStateException(e);
-              }
-              if (read != 4) {
-                nextInt = null;
-              } else {
-                nextInt = ((inb[0] & 0xFF) << 24) + ((inb[1] & 0xFF) << 16)
-                    + ((inb[2] & 0xFF) << 8) + (inb[3] & 0xFF);
-              }
-
-            }
-
-            @Override
-            public Integer next() {
-              Integer ret = nextInt;
-              prime();
-              return ret;
-            }
-
-            @Override
-            public void remove() {
-              throw new UnsupportedOperationException();
-            }
-          };
-          it.next(); // calls prime
-          return it;
-        }
-      };
-    }
-  };
-
-  @Test
-  public void testFile() throws ServiceException {
-    LocalStorageService service = new LocalStorageService("spoolTest", "file");
-    Spool<Integer> f = new SerializerFileSpool<Integer>(service, serializer,
-        deserializer);
-    test(f);
-    service.getScratchSpace().delete();
-  }
-
-  @Test
-  public void testInterop() throws ServiceException {
-    LocalStorageService service = new LocalStorageService("spoolTest", "file");
-    Codec<Integer> c = new IntegerCodec();
-
-
-    CodecFileAccumulable<Integer, Codec<Integer>> f = new CodecFileAccumulable<Integer, Codec<Integer>>(
-        service, c);
-    CodecFileIterable<Integer, Codec<Integer>> g = new CodecFileIterable<Integer, Codec<Integer>>(
-        new File(f.getName()), c);
-    test(f, g);
-    service.getScratchSpace().delete();
-  }
-
-  protected void test(Spool<Integer> f) throws ServiceException {
-    test(f, f);
-  }
-
-  protected void test(Accumulable<Integer> f, Iterable<Integer> g) throws ServiceException {
-
-    try (Accumulator<Integer> acc = f.accumulator()) {
-      for (int i = 0; i < 1000; i++) {
-        acc.add(i);
-      }
-    }
-    int i = 0;
-    for (int j : g) {
-      Assert.assertEquals(i, j);
-      i++;
-    }
-    Iterator<Integer> itA = g.iterator();
-    Iterator<Integer> itB = g.iterator();
-
-    for (i = 0; i < 1000; i++) {
-      Assert.assertEquals((int) itA.next(), i);
-      Assert.assertEquals((int) itB.next(), i);
-    }
-    Assert.assertFalse(itA.hasNext());
-    Assert.assertFalse(itB.hasNext());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/storage/TupleSerializerTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/storage/TupleSerializerTest.java b/reef-io/src/test/java/com/microsoft/reef/services/storage/TupleSerializerTest.java
deleted file mode 100644
index b112271..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/storage/TupleSerializerTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.storage;
-
-import com.microsoft.reef.exception.evaluator.ServiceException;
-import com.microsoft.reef.io.Accumulator;
-import com.microsoft.reef.io.Tuple;
-import com.microsoft.reef.io.serialization.Deserializer;
-import com.microsoft.reef.io.serialization.Serializer;
-import com.microsoft.reef.io.storage.FramingTupleDeserializer;
-import com.microsoft.reef.io.storage.FramingTupleSerializer;
-import com.microsoft.reef.io.storage.util.IntegerDeserializer;
-import com.microsoft.reef.io.storage.util.IntegerSerializer;
-import com.microsoft.reef.io.storage.util.StringDeserializer;
-import com.microsoft.reef.io.storage.util.StringSerializer;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.*;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-public class TupleSerializerTest {
-
-  private Serializer<Integer, OutputStream> keySerializer;
-  private Serializer<String, OutputStream> valSerializer;
-  private Deserializer<Integer, InputStream> keyDeserializer;
-  private Deserializer<String, InputStream> valDeserializer;
-  private FramingTupleSerializer<Integer, String> fts;
-  private ByteArrayOutputStream baos;
-  private FramingTupleDeserializer<Integer, String> ftd;
-  private Iterable<Tuple<Integer, String>> iterable;
-
-  @Before
-  public void setup() throws ServiceException {
-
-    keySerializer = new IntegerSerializer();
-    valSerializer = new StringSerializer();
-    keyDeserializer = new IntegerDeserializer();
-    valDeserializer = new StringDeserializer();
-
-    fts = new FramingTupleSerializer<Integer, String>(
-        keySerializer, valSerializer);
-    
-    baos = new ByteArrayOutputStream();
-    Accumulator<Tuple<Integer,String>> acc = fts.create(baos).accumulator();
-    for(int i = 0; i < 100; i++) {
-      acc.add(new Tuple<>(i, i+""));
-    }
-    acc.close();
-
-    ftd = new FramingTupleDeserializer<Integer, String>(
-        keyDeserializer, valDeserializer);
-    iterable = ftd.create(new ByteArrayInputStream(baos.toByteArray()));
-  }
-
-  @Test
-  public void testFramingSerializer() throws ServiceException, IOException {
-    int i = 0;
-    for (Tuple<Integer, String> t : iterable) {
-      Tuple<Integer, String> u = new Tuple<>(i, i + "");
-      Assert.assertEquals(u, t);
-      i++;
-    }
-    Assert.assertEquals(100, i);
-  }
-
-  @Test(expected = NoSuchElementException.class)
-  public void testReadOffEnd() {
-    Iterator<Tuple<Integer, String>> it = iterable.iterator();
-    try {
-      while (it.hasNext()) {
-        it.next();
-        it.hasNext();
-      }
-    } catch (NoSuchElementException e) {
-      throw new IllegalStateException("Errored out too early!", e);
-    }
-    it.next();
-  }
-  @Test(expected = UnsupportedOperationException.class)
-  public void testCantRemove() {
-    Iterator<Tuple<Integer, String>> it = iterable.iterator();
-    it.next();
-    it.remove();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java b/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
new file mode 100644
index 0000000..8c570bb
--- /dev/null
+++ b/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
@@ -0,0 +1,123 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.services.network;
+
+import org.apache.reef.io.network.naming.NameCache;
+import org.apache.reef.io.network.naming.NameClient;
+import org.apache.reef.io.network.naming.NameLookupClient;
+import org.apache.reef.io.network.naming.NameServer;
+import org.apache.reef.io.network.naming.exception.NamingException;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.NetUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
+
+public class NameClientTest {
+
+  static int retryCount, retryTimeout;
+
+  static {
+    Tang tang = Tang.Factory.getTang();
+    try {
+      retryCount = tang.newInjector().getNamedInstance(NameLookupClient.RetryCount.class);
+      retryTimeout = tang.newInjector().getNamedInstance(NameLookupClient.RetryTimeout.class);
+    } catch (InjectionException e1) {
+      throw new RuntimeException("Exception while trying to find default values for retryCount & Timeout", e1);
+    }
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+  }
+
+  /**
+   * Test method for {@link org.apache.reef.io.network.naming.NameClient#close()}.
+   *
+   * @throws Exception
+   */
+  @Test
+  public final void testClose() throws Exception {
+    IdentifierFactory factory = new StringIdentifierFactory();
+    try (NameServer server = new NameServer(0, factory)) {
+      int serverPort = server.getPort();
+      try (NameClient client = new NameClient(NetUtils.getLocalAddress(), serverPort, factory, retryCount, retryTimeout,
+          new NameCache(10000))) {
+        Identifier id = factory.getNewInstance("Task1");
+        client.register(id, new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+        client.unregister(id);
+        Thread.sleep(100);
+      }
+    }
+  }
+
+  /**
+   * Test method for {@link org.apache.reef.io.network.naming.NameClient#lookup()}.
+   * To check caching behavior with expireAfterAccess & expireAfterWrite
+   * Changing NameCache's pattern to expireAfterAccess causes this test to fail
+   *
+   * @throws Exception
+   */
+  @Test
+  public final void testLookup() throws Exception {
+    IdentifierFactory factory = new StringIdentifierFactory();
+    try (NameServer server = new NameServer(0, factory)) {
+      int serverPort = server.getPort();
+      try (NameClient client = new NameClient(NetUtils.getLocalAddress(), serverPort, factory, retryCount, retryTimeout,
+          new NameCache(150))) {
+        Identifier id = factory.getNewInstance("Task1");
+        client.register(id, new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+        client.lookup(id);// caches the entry
+        client.unregister(id);
+        Thread.sleep(100);
+        try {
+          InetSocketAddress addr = client.lookup(id);
+          Thread.sleep(100);
+          //With expireAfterAccess, the previous lookup would reset expiry to 150ms
+          //more and 100ms wait will not expire the item and will return the cached value
+          //With expireAfterWrite, the extra wait of 100 ms will expire the item
+          //resulting in NamingException and the test passes
+          addr = client.lookup(id);
+          Assert.assertNull("client.lookup(id)", addr);
+        } catch (Exception e) {
+          if (e instanceof ExecutionException) {
+            Assert.assertTrue("Execution Exception cause is instanceof NamingException", e.getCause() instanceof NamingException);
+          } else
+            throw e;
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java b/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
new file mode 100644
index 0000000..a706196
--- /dev/null
+++ b/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
@@ -0,0 +1,364 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.services.network;
+
+import org.apache.reef.io.naming.NameAssignment;
+import org.apache.reef.io.network.naming.*;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.NetUtils;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Naming server and client test
+ */
+public class NamingTest {
+
+  private static final Logger LOG = Logger.getLogger(NamingTest.class.getName());
+  private static final int retryCount;
+  private static final int retryTimeout;
+
+  static {
+    try {
+      final Injector injector = Tang.Factory.getTang().newInjector();
+      retryCount = injector.getNamedInstance(NameLookupClient.RetryCount.class);
+      retryTimeout = injector.getNamedInstance(NameLookupClient.RetryTimeout.class);
+    } catch (final InjectionException ex) {
+      final String msg = "Exception while trying to find default values for retryCount & Timeout";
+      LOG.log(Level.SEVERE, msg, ex);
+      throw new RuntimeException(msg, ex);
+    }
+  }
+
+  @Rule
+  public final TestName name = new TestName();
+  final long TTL = 30000;
+  final IdentifierFactory factory = new StringIdentifierFactory();
+  int port;
+
+  /**
+   * NameServer and NameLookupClient test
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testNamingLookup() throws Exception {
+
+    LOG.log(Level.FINEST, this.name.getMethodName());
+
+    // names 
+    final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
+    idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+    idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
+
+    // run a server
+    final NameServer server = new NameServer(0, this.factory);
+    this.port = server.getPort();
+    for (final Identifier id : idToAddrMap.keySet()) {
+      server.register(id, idToAddrMap.get(id));
+    }
+
+    // run a client
+    final NameLookupClient client = new NameLookupClient(NetUtils.getLocalAddress(), this.port,
+        10000, this.factory, retryCount, retryTimeout, new NameCache(this.TTL));
+
+    final Identifier id1 = this.factory.getNewInstance("task1");
+    final Identifier id2 = this.factory.getNewInstance("task2");
+
+    final Map<Identifier, InetSocketAddress> respMap = new HashMap<Identifier, InetSocketAddress>();
+    InetSocketAddress addr1 = client.lookup(id1);
+    respMap.put(id1, addr1);
+    InetSocketAddress addr2 = client.lookup(id2);
+    respMap.put(id2, addr2);
+
+    for (final Identifier id : respMap.keySet()) {
+      LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, respMap.get(id)});
+    }
+
+    Assert.assertTrue(isEqual(idToAddrMap, respMap));
+
+    client.close();
+    server.close();
+  }
+
+  /**
+   * Test concurrent lookups (threads share a client)
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentNamingLookup() throws Exception {
+
+    LOG.log(Level.FINEST, this.name.getMethodName());
+
+    // test it 3 times to make failure likely
+    for (int i = 0; i < 3; i++) {
+
+      LOG.log(Level.FINEST, "test {0}", i);
+
+      // names 
+      final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
+      idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+      idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
+      idToAddrMap.put(this.factory.getNewInstance("task3"), new InetSocketAddress(NetUtils.getLocalAddress(), 7003));
+
+      // run a server
+      final NameServer server = new NameServer(0, this.factory);
+      this.port = server.getPort();
+      for (final Identifier id : idToAddrMap.keySet()) {
+        server.register(id, idToAddrMap.get(id));
+      }
+
+      // run a client
+      final NameLookupClient client = new NameLookupClient(NetUtils.getLocalAddress(), this.port,
+          10000, this.factory, retryCount, retryTimeout, new NameCache(this.TTL));
+
+      final Identifier id1 = this.factory.getNewInstance("task1");
+      final Identifier id2 = this.factory.getNewInstance("task2");
+      final Identifier id3 = this.factory.getNewInstance("task3");
+
+      final ExecutorService e = Executors.newCachedThreadPool();
+
+      final ConcurrentMap<Identifier, InetSocketAddress> respMap = new ConcurrentHashMap<Identifier, InetSocketAddress>();
+
+      final Future<?> f1 = e.submit(new Runnable() {
+        @Override
+        public void run() {
+          InetSocketAddress addr = null;
+          try {
+            addr = client.lookup(id1);
+          } catch (final Exception e) {
+            LOG.log(Level.SEVERE, "Lookup failed", e);
+            Assert.fail(e.toString());
+          }
+          respMap.put(id1, addr);
+        }
+      });
+      final Future<?> f2 = e.submit(new Runnable() {
+        @Override
+        public void run() {
+          InetSocketAddress addr = null;
+          try {
+            addr = client.lookup(id2);
+          } catch (final Exception e) {
+            LOG.log(Level.SEVERE, "Lookup failed", e);
+            Assert.fail(e.toString());
+          }
+          respMap.put(id2, addr);
+        }
+      });
+      final Future<?> f3 = e.submit(new Runnable() {
+        @Override
+        public void run() {
+          InetSocketAddress addr = null;
+          try {
+            addr = client.lookup(id3);
+          } catch (final Exception e) {
+            LOG.log(Level.SEVERE, "Lookup failed", e);
+            Assert.fail(e.toString());
+          }
+          respMap.put(id3, addr);
+        }
+      });
+
+      f1.get();
+      f2.get();
+      f3.get();
+
+      for (final Identifier id : respMap.keySet()) {
+        LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, respMap.get(id)});
+      }
+
+      Assert.assertTrue(isEqual(idToAddrMap, respMap));
+
+      client.close();
+      server.close();
+    }
+  }
+
+  /**
+   * NameServer and NameRegistryClient test
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testNamingRegistry() throws Exception {
+
+    LOG.log(Level.FINEST, this.name.getMethodName());
+
+    final NameServer server = new NameServer(0, this.factory);
+    this.port = server.getPort();
+
+    // names to start with
+    final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
+    idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+    idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
+
+    // registration
+    // invoke registration from the client side
+    final NameRegistryClient client = new NameRegistryClient(
+        NetUtils.getLocalAddress(), this.port, this.factory);
+    for (final Identifier id : idToAddrMap.keySet()) {
+      client.register(id, idToAddrMap.get(id));
+    }
+
+    // wait
+    final Set<Identifier> ids = idToAddrMap.keySet();
+    busyWait(server, ids.size(), ids);
+
+    // check the server side 
+    Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier, InetSocketAddress>();
+    Iterable<NameAssignment> nas = server.lookup(ids);
+
+    for (final NameAssignment na : nas) {
+      LOG.log(Level.FINEST, "Mapping: {0} -> {1}",
+          new Object[]{na.getIdentifier(), na.getAddress()});
+      serverMap.put(na.getIdentifier(), na.getAddress());
+    }
+
+    Assert.assertTrue(isEqual(idToAddrMap, serverMap));
+
+    // un-registration
+    for (final Identifier id : idToAddrMap.keySet()) {
+      client.unregister(id);
+    }
+
+    // wait
+    busyWait(server, 0, ids);
+
+    serverMap = new HashMap<Identifier, InetSocketAddress>();
+    nas = server.lookup(ids);
+    for (final NameAssignment na : nas)
+      serverMap.put(na.getIdentifier(), na.getAddress());
+
+    Assert.assertEquals(0, serverMap.size());
+
+    client.close();
+    server.close();
+  }
+
+  /**
+   * NameServer and NameClient test
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testNameClient() throws Exception {
+
+    LOG.log(Level.FINEST, this.name.getMethodName());
+
+    final NameServer server = new NameServer(0, this.factory);
+    this.port = server.getPort();
+
+    final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
+    idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+    idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
+
+    // registration
+    // invoke registration from the client side
+    final NameClient client = new NameClient(NetUtils.getLocalAddress(), this.port,
+        this.factory, retryCount, retryTimeout, new NameCache(this.TTL));
+    for (final Identifier id : idToAddrMap.keySet()) {
+      client.register(id, idToAddrMap.get(id));
+    }
+
+    // wait
+    final Set<Identifier> ids = idToAddrMap.keySet();
+    busyWait(server, ids.size(), ids);
+
+    // lookup
+    final Identifier id1 = this.factory.getNewInstance("task1");
+    final Identifier id2 = this.factory.getNewInstance("task2");
+
+    final Map<Identifier, InetSocketAddress> respMap = new HashMap<Identifier, InetSocketAddress>();
+    InetSocketAddress addr1 = client.lookup(id1);
+    respMap.put(id1, addr1);
+    InetSocketAddress addr2 = client.lookup(id2);
+    respMap.put(id2, addr2);
+
+    for (final Identifier id : respMap.keySet()) {
+      LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, respMap.get(id)});
+    }
+
+    Assert.assertTrue(isEqual(idToAddrMap, respMap));
+
+    // un-registration
+    for (final Identifier id : idToAddrMap.keySet()) {
+      client.unregister(id);
+    }
+
+    // wait
+    busyWait(server, 0, ids);
+
+    final Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier, InetSocketAddress>();
+    addr1 = server.lookup(id1);
+    if (addr1 != null) serverMap.put(id1, addr1);
+    addr2 = server.lookup(id1);
+    if (addr2 != null) serverMap.put(id2, addr2);
+
+    Assert.assertEquals(0, serverMap.size());
+
+    client.close();
+    server.close();
+  }
+
+  private boolean isEqual(final Map<Identifier, InetSocketAddress> map1,
+                          final Map<Identifier, InetSocketAddress> map2) {
+
+    if (map1.size() != map2.size()) {
+      return false;
+    }
+
+    for (final Identifier id : map1.keySet()) {
+      final InetSocketAddress addr1 = map1.get(id);
+      final InetSocketAddress addr2 = map2.get(id);
+      if (!addr1.equals(addr2)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private void busyWait(final NameServer server, final int expected, final Set<Identifier> ids) {
+    int count = 0;
+    for (; ; ) {
+      final Iterable<NameAssignment> nas = server.lookup(ids);
+      for (final @SuppressWarnings("unused") NameAssignment na : nas) {
+        ++count;
+      }
+      if (count == expected) {
+        break;
+      }
+      count = 0;
+    }
+  }
+}