You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:46:58 UTC

[25/51] [partial] incubator-reef git commit: [REEF-93] Move java sources to lang/java

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/CodecRamMap.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/CodecRamMap.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/CodecRamMap.java
new file mode 100644
index 0000000..fd2e80f
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/CodecRamMap.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.ram;
+
+import org.apache.reef.io.ExternalMap;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.io.storage.util.GetAllIterable;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+public class CodecRamMap<T> implements ExternalMap<T> {
+
+  private final Codec<T> c;
+  private final ConcurrentSkipListMap<CharSequence, byte[]> map;
+
+  @Inject
+  public CodecRamMap(RamStorageService ramStore,
+                     @Parameter(RamMapCodec.class) final Codec<T> c) {
+    this.c = c;
+    this.map = new ConcurrentSkipListMap<CharSequence, byte[]>();
+  }
+
+  @Override
+  public boolean containsKey(final CharSequence key) {
+    return map.containsKey(key);
+  }
+
+  @Override
+  public T get(final CharSequence key) {
+    final byte[] ret = map.get(key);
+    return ret != null ? c.decode(ret) : null;
+  }
+
+  @Override
+  public T put(final CharSequence key, T value) {
+    final byte[] ret = map.put(key, c.encode(value));
+    return ret != null ? c.decode(ret) : null;
+  }
+
+  @Override
+  public T remove(final CharSequence key) {
+    final byte[] ret = map.remove(key);
+    return ret != null ? c.decode(ret) : null;
+  }
+
+  @Override
+  public void putAll(final Map<? extends CharSequence, ? extends T> m) {
+    for (final CharSequence x : m.keySet()) {
+      map.put(x, c.encode(m.get(x)));
+    }
+  }
+
+  @Override
+  public Iterable<Map.Entry<CharSequence, T>> getAll(
+      final Set<? extends CharSequence> keys) {
+    return new GetAllIterable<T>(keys, this);
+  }
+
+  @NamedParameter
+  static public class RamMapCodec implements Name<Codec<?>> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamMap.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamMap.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamMap.java
new file mode 100644
index 0000000..35a20e1
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamMap.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.ram;
+
+import org.apache.reef.io.ExternalMap;
+import org.apache.reef.io.storage.util.GetAllIterable;
+
+import javax.inject.Inject;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * Simple in-memory ExternalMap implementation.  This class does not require
+ * any codecs, and so is guaranteed to be instantiable.  Therefore, it is the
+ * default ExternalMap provided by StorageManagerRam.
+ */
+public class RamMap<T> implements ExternalMap<T> {
+  private final ConcurrentSkipListMap<CharSequence, T> map
+      = new ConcurrentSkipListMap<CharSequence, T>();
+
+  @Inject
+  public RamMap(RamStorageService ramStore) {
+    //this.localStore = localStore;
+  }
+
+  @Override
+  public boolean containsKey(CharSequence key) {
+    return map.containsKey(key);
+  }
+
+  @Override
+  public T get(CharSequence key) {
+    return map.get(key);
+  }
+
+  @Override
+  public T put(CharSequence key, T value) {
+    return map.put(key, value);
+  }
+
+  @Override
+  public T remove(CharSequence key) {
+    return map.remove(key);
+  }
+
+  @Override
+  public void putAll(Map<? extends CharSequence, ? extends T> m) {
+    map.putAll(m);
+  }
+
+  @Override
+  public Iterable<Entry<CharSequence, T>> getAll(Set<? extends CharSequence> keys) {
+    return new GetAllIterable<>(keys, this);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamSpool.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamSpool.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamSpool.java
new file mode 100644
index 0000000..dc7e606
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamSpool.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.ram;
+
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.Spool;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A SpoolFile implementation that is backed by RAM.
+ * <p/>
+ * It uses an ArrayList to store the objects in.
+ */
+public final class RamSpool<T> implements Spool<T> {
+
+  private final List<T> backingStore = new ArrayList<T>();
+  private boolean canAppend = true;
+  private boolean canGetAccumulator = true;
+
+  @Inject
+  public RamSpool(RamStorageService ramStore) {
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+    canAppend = false;
+    return backingStore.iterator();
+  }
+
+  @Override
+  public Accumulator<T> accumulator() {
+    if (!canGetAccumulator) {
+      throw new UnsupportedOperationException("Can only getAccumulator() once!");
+    }
+    canGetAccumulator = false;
+    return new Accumulator<T>() {
+      @Override
+      public void add(T datum) {
+        if (!canAppend) {
+          throw new ConcurrentModificationException("Attempt to append after creating iterator!");
+        }
+        backingStore.add(datum);
+      }
+
+      @Override
+      public void close() {
+        canAppend = false;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamStorageService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamStorageService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamStorageService.java
new file mode 100644
index 0000000..64e489a
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamStorageService.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.ram;
+
+import org.apache.reef.io.storage.ScratchSpace;
+import org.apache.reef.io.storage.StorageService;
+
+import javax.inject.Inject;
+
+public class RamStorageService implements StorageService {
+  @Inject
+  public RamStorageService() {
+  }
+
+  // TODO move getScratchSpace into its own class, just like everything else.
+  // TODO add context object or something to StorageService
+  @Override
+  public ScratchSpace getScratchSpace() {
+    throw new UnsupportedOperationException(
+        "No temp space / tracking of temp space for main memory (yet).");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/SortingRamSpool.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/SortingRamSpool.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/SortingRamSpool.java
new file mode 100644
index 0000000..175c7a6
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/SortingRamSpool.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.ram;
+
+import org.apache.reef.exception.evaluator.StorageException;
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.Spool;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+public class SortingRamSpool<T> implements Spool<T> {
+  private final PriorityQueue<T> heap;
+  private boolean ready = false;
+  private Accumulator<T> acc = new Accumulator<T>() {
+    @Override
+    public void add(T datum) throws StorageException {
+      if (ready)
+        throw new IllegalStateException("add called after close!");
+      heap.add(datum);
+    }
+
+    @Override
+    public void close() throws StorageException {
+      ready = true;
+    }
+  };
+  private Iterator<T> it = new Iterator<T>() {
+
+    @Override
+    public boolean hasNext() {
+      return !heap.isEmpty();
+    }
+
+    @Override
+    public T next() {
+      return heap.remove();
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException(
+          "This iterator consumes the data it returns. remove() does not make any sense!");
+    }
+
+  };
+
+  public SortingRamSpool() {
+    heap = new PriorityQueue<>();
+  }
+
+  public SortingRamSpool(Comparator<T> c) {
+    heap = new PriorityQueue<>(11, c);
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+    if (!ready) {
+      throw new IllegalStateException("Cannot call iterator() while accumulator is still open!");
+    }
+    Iterator<T> ret = it;
+    it = null;
+    return ret;
+  }
+
+  @Override
+  public Accumulator<T> accumulator() throws StorageException {
+    Accumulator<T> ret = acc;
+    acc = null;
+    return ret;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/GetAllIterable.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/GetAllIterable.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/GetAllIterable.java
new file mode 100644
index 0000000..b0b892d
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/GetAllIterable.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.util;
+
+import org.apache.reef.io.ExternalMap;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+public class GetAllIterable<T> implements
+    Iterable<Map.Entry<CharSequence, T>> {
+  private final Set<? extends CharSequence> keys;
+  private final ExternalMap<T> map;
+
+  public GetAllIterable(Set<? extends CharSequence> keys, ExternalMap<T> map) {
+    this.keys = keys;
+    this.map = map;
+  }
+
+  @Override
+  public Iterator<Map.Entry<CharSequence, T>> iterator() {
+    final Iterator<? extends CharSequence> k = keys.iterator();
+    return new Iterator<Map.Entry<CharSequence, T>>() {
+      CharSequence lastKey = null;
+      CharSequence curKey = findNextKey();
+
+      private CharSequence findNextKey() {
+        while (k.hasNext()) {
+          CharSequence next = k.next();
+          if (map.containsKey(next)) {
+            return next;
+          }
+        }
+        return null;
+      }
+
+      @Override
+      public boolean hasNext() {
+        return curKey != null;
+      }
+
+      @Override
+      public Map.Entry<CharSequence, T> next() {
+        final CharSequence key = curKey;
+        curKey = findNextKey();
+        lastKey = key;
+        if (key == null)
+          throw new NoSuchElementException();
+
+        final T v = map.get(key);
+
+        return new Map.Entry<CharSequence, T>() {
+          @Override
+          public CharSequence getKey() {
+            return key;
+          }
+
+          @Override
+          public T getValue() {
+            return v;
+          }
+
+          @Override
+          public T setValue(T v) {
+            throw new UnsupportedOperationException(
+                "No support for mutating values via iterator");
+          }
+        };
+      }
+
+      @Override
+      public void remove() {
+        map.remove(lastKey);
+      }
+    };
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerCodec.java
new file mode 100644
index 0000000..12e1bd9
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerCodec.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.util;
+
+import org.apache.reef.io.serialization.Codec;
+
+public class IntegerCodec implements Codec<Integer> {
+
+  @Override
+  public byte[] encode(Integer obj) {
+    return Integer.toString(obj).getBytes();
+  }
+
+  @Override
+  public Integer decode(byte[] buf) {
+    return Integer.decode(new String(buf));
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerDeserializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerDeserializer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerDeserializer.java
new file mode 100644
index 0000000..5bd3641
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerDeserializer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.util;
+
+import org.apache.reef.exception.evaluator.ServiceRuntimeException;
+import org.apache.reef.io.serialization.Deserializer;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+
+public class IntegerDeserializer implements
+    Deserializer<Integer, InputStream> {
+  @Override
+  public Iterable<Integer> create(InputStream arg) {
+    final DataInputStream dis = new DataInputStream(arg);
+    return new Iterable<Integer>() {
+
+      @Override
+      public Iterator<Integer> iterator() {
+        return new Iterator<Integer>() {
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public Integer next() {
+            try {
+              return dis.readInt();
+            } catch (IOException e) {
+              throw new ServiceRuntimeException(e);
+            }
+          }
+
+          @Override
+          public boolean hasNext() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    };
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerSerializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerSerializer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerSerializer.java
new file mode 100644
index 0000000..69296cc
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerSerializer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.util;
+
+import org.apache.reef.exception.evaluator.ServiceException;
+import org.apache.reef.io.Accumulable;
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.serialization.Serializer;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class IntegerSerializer implements
+    Serializer<Integer, OutputStream> {
+  @Override
+  public Accumulable<Integer> create(OutputStream arg) {
+    final DataOutputStream dos = new DataOutputStream(arg);
+    return new Accumulable<Integer>() {
+
+      @Override
+      public Accumulator<Integer> accumulator() throws ServiceException {
+        return new Accumulator<Integer>() {
+          @Override
+          public void add(Integer datum) throws ServiceException {
+            try {
+              dos.writeInt(datum);
+            } catch (IOException e) {
+              throw new ServiceException(e);
+            }
+          }
+
+          @Override
+          public void close() throws ServiceException {
+            try {
+              dos.close();
+            } catch (IOException e) {
+              throw new ServiceException(e);
+            }
+          }
+
+        };
+      }
+    };
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/StringDeserializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/StringDeserializer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/StringDeserializer.java
new file mode 100644
index 0000000..c1108f4
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/StringDeserializer.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.util;
+
+import org.apache.reef.exception.evaluator.ServiceRuntimeException;
+import org.apache.reef.io.serialization.Deserializer;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+
+public class StringDeserializer implements
+    Deserializer<String, InputStream> {
+  @Override
+  public Iterable<String> create(InputStream arg) {
+    final DataInputStream dis = new DataInputStream(arg);
+    return new Iterable<String>() {
+
+      @Override
+      public Iterator<String> iterator() {
+        return new Iterator<String>() {
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public String next() {
+            int len = 0;
+            try {
+              len = dis.readInt();
+              byte[] b = new byte[len];
+              dis.readFully(b);
+              return new String(b);
+            } catch (IOException e) {
+              throw new ServiceRuntimeException(e);
+            }
+          }
+
+          @Override
+          public boolean hasNext() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    };
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/StringSerializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/StringSerializer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/StringSerializer.java
new file mode 100644
index 0000000..6cf544e
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/StringSerializer.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.util;
+
+import org.apache.reef.exception.evaluator.ServiceException;
+import org.apache.reef.io.Accumulable;
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.serialization.Serializer;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class StringSerializer implements
+    Serializer<String, OutputStream> {
+  @Override
+  public Accumulable<String> create(OutputStream arg) {
+    final DataOutputStream dos = new DataOutputStream(arg);
+    return new Accumulable<String>() {
+
+      @Override
+      public Accumulator<String> accumulator() throws ServiceException {
+        return new Accumulator<String>() {
+
+          @Override
+          public void add(String datum) throws ServiceException {
+            byte[] b = datum.getBytes();
+            try {
+              dos.writeInt(b.length);
+              dos.write(b);
+            } catch (IOException e) {
+              throw new ServiceException(e);
+            }
+
+          }
+
+          @Override
+          public void close() throws ServiceException {
+            try {
+              dos.close();
+            } catch (IOException e) {
+              throw new ServiceException(e);
+            }
+          }
+        };
+      }
+    };
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/TupleKeyComparator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/TupleKeyComparator.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/TupleKeyComparator.java
new file mode 100644
index 0000000..ae690f4
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/TupleKeyComparator.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.util;
+
+import org.apache.reef.io.Tuple;
+
+import java.util.Comparator;
+
+public final class TupleKeyComparator<K, V> implements
+    Comparator<Tuple<K, V>> {
+  private final Comparator<K> c;
+
+  public TupleKeyComparator(Comparator<K> c) {
+    this.c = c;
+  }
+
+  @Override
+  public int compare(Tuple<K, V> o1, Tuple<K, V> o2) {
+    return c.compare(o1.getKey(), o2.getKey());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/proto/ns_protocol.proto
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/proto/ns_protocol.proto b/lang/java/reef-io/src/main/proto/ns_protocol.proto
new file mode 100644
index 0000000..1a9db1e
--- /dev/null
+++ b/lang/java/reef-io/src/main/proto/ns_protocol.proto
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+option java_package = "org.apache.reef.io.network.proto";
+option java_outer_classname = "ReefNetworkServiceProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+message NSMessagePBuf {
+  required string srcid = 2;
+  required string destid = 3;
+  repeated NSRecordPBuf msgs = 4;
+}
+
+message NSRecordPBuf {
+  required bytes data = 1;
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
new file mode 100644
index 0000000..f1f1a60
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network;
+
+import org.apache.reef.io.network.naming.*;
+import org.apache.reef.io.network.naming.exception.NamingException;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.NetUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
+
+public class NameClientTest {
+
+  static int retryCount, retryTimeout;
+
+  static {
+    Tang tang = Tang.Factory.getTang();
+    try {
+      retryCount = tang.newInjector().getNamedInstance(NameLookupClient.RetryCount.class);
+      retryTimeout = tang.newInjector().getNamedInstance(NameLookupClient.RetryTimeout.class);
+    } catch (InjectionException e1) {
+      throw new RuntimeException("Exception while trying to find default values for retryCount & Timeout", e1);
+    }
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+  }
+
+  /**
+   * Test method for {@link org.apache.reef.io.network.naming.NameClient#close()}.
+   *
+   * @throws Exception
+   */
+  @Test
+  public final void testClose() throws Exception {
+    IdentifierFactory factory = new StringIdentifierFactory();
+    try (NameServer server = new NameServerImpl(0, factory)) {
+      int serverPort = server.getPort();
+      try (NameClient client = new NameClient(NetUtils.getLocalAddress(), serverPort, factory, retryCount, retryTimeout,
+          new NameCache(10000))) {
+        Identifier id = factory.getNewInstance("Task1");
+        client.register(id, new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+        client.unregister(id);
+        Thread.sleep(100);
+      }
+    }
+  }
+
+  /**
+   * Test method for {@link org.apache.reef.io.network.naming.NameClient#lookup()}.
+   * To check caching behavior with expireAfterAccess & expireAfterWrite
+   * Changing NameCache's pattern to expireAfterAccess causes this test to fail
+   *
+   * @throws Exception
+   */
+  @Test
+  public final void testLookup() throws Exception {
+    IdentifierFactory factory = new StringIdentifierFactory();
+    try (NameServer server = new NameServerImpl(0, factory)) {
+      int serverPort = server.getPort();
+      try (NameClient client = new NameClient(NetUtils.getLocalAddress(), serverPort, factory, retryCount, retryTimeout,
+          new NameCache(150))) {
+        Identifier id = factory.getNewInstance("Task1");
+        client.register(id, new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+        client.lookup(id);// caches the entry
+        client.unregister(id);
+        Thread.sleep(100);
+        try {
+          InetSocketAddress addr = client.lookup(id);
+          Thread.sleep(100);
+          //With expireAfterAccess, the previous lookup would reset expiry to 150ms
+          //more and 100ms wait will not expire the item and will return the cached value
+          //With expireAfterWrite, the extra wait of 100 ms will expire the item
+          //resulting in NamingException and the test passes
+          addr = client.lookup(id);
+          Assert.assertNull("client.lookup(id)", addr);
+        } catch (Exception e) {
+          if (e instanceof ExecutionException) {
+            Assert.assertTrue("Execution Exception cause is instanceof NamingException", e.getCause() instanceof NamingException);
+          } else
+            throw e;
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
new file mode 100644
index 0000000..71382e2
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network;
+
+import org.apache.reef.io.naming.NameAssignment;
+import org.apache.reef.io.network.naming.*;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.NetUtils;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Naming server and client test
+ */
+public class NamingTest {
+
+  private static final Logger LOG = Logger.getLogger(NamingTest.class.getName());
+  private static final int retryCount;
+  private static final int retryTimeout;
+
+  static {
+    try {
+      final Injector injector = Tang.Factory.getTang().newInjector();
+      retryCount = injector.getNamedInstance(NameLookupClient.RetryCount.class);
+      retryTimeout = injector.getNamedInstance(NameLookupClient.RetryTimeout.class);
+    } catch (final InjectionException ex) {
+      final String msg = "Exception while trying to find default values for retryCount & Timeout";
+      LOG.log(Level.SEVERE, msg, ex);
+      throw new RuntimeException(msg, ex);
+    }
+  }
+
+  @Rule
+  public final TestName name = new TestName();
+  final long TTL = 30000;
+  final IdentifierFactory factory = new StringIdentifierFactory();
+  int port;
+
+  /**
+   * NameServer and NameLookupClient test
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testNamingLookup() throws Exception {
+
+    LOG.log(Level.FINEST, this.name.getMethodName());
+
+    // names 
+    final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
+    idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+    idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
+
+    // run a server
+    final NameServer server = new NameServerImpl(0, this.factory);
+    this.port = server.getPort();
+    for (final Identifier id : idToAddrMap.keySet()) {
+      server.register(id, idToAddrMap.get(id));
+    }
+
+    // run a client
+    final NameLookupClient client = new NameLookupClient(NetUtils.getLocalAddress(), this.port,
+        10000, this.factory, retryCount, retryTimeout, new NameCache(this.TTL));
+
+    final Identifier id1 = this.factory.getNewInstance("task1");
+    final Identifier id2 = this.factory.getNewInstance("task2");
+
+    final Map<Identifier, InetSocketAddress> respMap = new HashMap<Identifier, InetSocketAddress>();
+    InetSocketAddress addr1 = client.lookup(id1);
+    respMap.put(id1, addr1);
+    InetSocketAddress addr2 = client.lookup(id2);
+    respMap.put(id2, addr2);
+
+    for (final Identifier id : respMap.keySet()) {
+      LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, respMap.get(id)});
+    }
+
+    Assert.assertTrue(isEqual(idToAddrMap, respMap));
+
+    client.close();
+    server.close();
+  }
+
+  /**
+   * Test concurrent lookups (threads share a client)
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentNamingLookup() throws Exception {
+
+    LOG.log(Level.FINEST, this.name.getMethodName());
+
+    // test it 3 times to make failure likely
+    for (int i = 0; i < 3; i++) {
+
+      LOG.log(Level.FINEST, "test {0}", i);
+
+      // names 
+      final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
+      idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+      idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
+      idToAddrMap.put(this.factory.getNewInstance("task3"), new InetSocketAddress(NetUtils.getLocalAddress(), 7003));
+
+      // run a server
+      final NameServer server = new NameServerImpl(0, this.factory);
+      this.port = server.getPort();
+      for (final Identifier id : idToAddrMap.keySet()) {
+        server.register(id, idToAddrMap.get(id));
+      }
+
+      // run a client
+      final NameLookupClient client = new NameLookupClient(NetUtils.getLocalAddress(), this.port,
+          10000, this.factory, retryCount, retryTimeout, new NameCache(this.TTL));
+
+      final Identifier id1 = this.factory.getNewInstance("task1");
+      final Identifier id2 = this.factory.getNewInstance("task2");
+      final Identifier id3 = this.factory.getNewInstance("task3");
+
+      final ExecutorService e = Executors.newCachedThreadPool();
+
+      final ConcurrentMap<Identifier, InetSocketAddress> respMap = new ConcurrentHashMap<Identifier, InetSocketAddress>();
+
+      final Future<?> f1 = e.submit(new Runnable() {
+        @Override
+        public void run() {
+          InetSocketAddress addr = null;
+          try {
+            addr = client.lookup(id1);
+          } catch (final Exception e) {
+            LOG.log(Level.SEVERE, "Lookup failed", e);
+            Assert.fail(e.toString());
+          }
+          respMap.put(id1, addr);
+        }
+      });
+      final Future<?> f2 = e.submit(new Runnable() {
+        @Override
+        public void run() {
+          InetSocketAddress addr = null;
+          try {
+            addr = client.lookup(id2);
+          } catch (final Exception e) {
+            LOG.log(Level.SEVERE, "Lookup failed", e);
+            Assert.fail(e.toString());
+          }
+          respMap.put(id2, addr);
+        }
+      });
+      final Future<?> f3 = e.submit(new Runnable() {
+        @Override
+        public void run() {
+          InetSocketAddress addr = null;
+          try {
+            addr = client.lookup(id3);
+          } catch (final Exception e) {
+            LOG.log(Level.SEVERE, "Lookup failed", e);
+            Assert.fail(e.toString());
+          }
+          respMap.put(id3, addr);
+        }
+      });
+
+      f1.get();
+      f2.get();
+      f3.get();
+
+      for (final Identifier id : respMap.keySet()) {
+        LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, respMap.get(id)});
+      }
+
+      Assert.assertTrue(isEqual(idToAddrMap, respMap));
+
+      client.close();
+      server.close();
+    }
+  }
+
+  /**
+   * NameServer and NameRegistryClient test
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testNamingRegistry() throws Exception {
+
+    LOG.log(Level.FINEST, this.name.getMethodName());
+
+    final NameServer server = new NameServerImpl(0, this.factory);
+    this.port = server.getPort();
+
+    // names to start with
+    final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
+    idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+    idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
+
+    // registration
+    // invoke registration from the client side
+    final NameRegistryClient client = new NameRegistryClient(
+        NetUtils.getLocalAddress(), this.port, this.factory);
+    for (final Identifier id : idToAddrMap.keySet()) {
+      client.register(id, idToAddrMap.get(id));
+    }
+
+    // wait
+    final Set<Identifier> ids = idToAddrMap.keySet();
+    busyWait(server, ids.size(), ids);
+
+    // check the server side 
+    Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier, InetSocketAddress>();
+    Iterable<NameAssignment> nas = server.lookup(ids);
+
+    for (final NameAssignment na : nas) {
+      LOG.log(Level.FINEST, "Mapping: {0} -> {1}",
+          new Object[]{na.getIdentifier(), na.getAddress()});
+      serverMap.put(na.getIdentifier(), na.getAddress());
+    }
+
+    Assert.assertTrue(isEqual(idToAddrMap, serverMap));
+
+    // un-registration
+    for (final Identifier id : idToAddrMap.keySet()) {
+      client.unregister(id);
+    }
+
+    // wait
+    busyWait(server, 0, ids);
+
+    serverMap = new HashMap<Identifier, InetSocketAddress>();
+    nas = server.lookup(ids);
+    for (final NameAssignment na : nas)
+      serverMap.put(na.getIdentifier(), na.getAddress());
+
+    Assert.assertEquals(0, serverMap.size());
+
+    client.close();
+    server.close();
+  }
+
+  /**
+   * NameServer and NameClient test
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testNameClient() throws Exception {
+
+    LOG.log(Level.FINEST, this.name.getMethodName());
+
+    final NameServer server = new NameServerImpl(0, this.factory);
+    this.port = server.getPort();
+
+    final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
+    idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+    idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
+
+    // registration
+    // invoke registration from the client side
+    final NameClient client = new NameClient(NetUtils.getLocalAddress(), this.port,
+        this.factory, retryCount, retryTimeout, new NameCache(this.TTL));
+    for (final Identifier id : idToAddrMap.keySet()) {
+      client.register(id, idToAddrMap.get(id));
+    }
+
+    // wait
+    final Set<Identifier> ids = idToAddrMap.keySet();
+    busyWait(server, ids.size(), ids);
+
+    // lookup
+    final Identifier id1 = this.factory.getNewInstance("task1");
+    final Identifier id2 = this.factory.getNewInstance("task2");
+
+    final Map<Identifier, InetSocketAddress> respMap = new HashMap<Identifier, InetSocketAddress>();
+    InetSocketAddress addr1 = client.lookup(id1);
+    respMap.put(id1, addr1);
+    InetSocketAddress addr2 = client.lookup(id2);
+    respMap.put(id2, addr2);
+
+    for (final Identifier id : respMap.keySet()) {
+      LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, respMap.get(id)});
+    }
+
+    Assert.assertTrue(isEqual(idToAddrMap, respMap));
+
+    // un-registration
+    for (final Identifier id : idToAddrMap.keySet()) {
+      client.unregister(id);
+    }
+
+    // wait
+    busyWait(server, 0, ids);
+
+    final Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier, InetSocketAddress>();
+    addr1 = server.lookup(id1);
+    if (addr1 != null) serverMap.put(id1, addr1);
+    addr2 = server.lookup(id1);
+    if (addr2 != null) serverMap.put(id2, addr2);
+
+    Assert.assertEquals(0, serverMap.size());
+
+    client.close();
+    server.close();
+  }
+
+  private boolean isEqual(final Map<Identifier, InetSocketAddress> map1,
+                          final Map<Identifier, InetSocketAddress> map2) {
+
+    if (map1.size() != map2.size()) {
+      return false;
+    }
+
+    for (final Identifier id : map1.keySet()) {
+      final InetSocketAddress addr1 = map1.get(id);
+      final InetSocketAddress addr2 = map2.get(id);
+      if (!addr1.equals(addr2)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private void busyWait(final NameServer server, final int expected, final Set<Identifier> ids) {
+    int count = 0;
+    for (; ; ) {
+      final Iterable<NameAssignment> nas = server.lookup(ids);
+      for (final @SuppressWarnings("unused") NameAssignment na : nas) {
+        ++count;
+      }
+      if (count == expected) {
+        break;
+      }
+      count = 0;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
new file mode 100644
index 0000000..96cf7d4
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.Connection;
+import org.apache.reef.io.network.Message;
+import org.apache.reef.io.network.impl.MessagingTransportFactory;
+import org.apache.reef.io.network.impl.NetworkService;
+import org.apache.reef.io.network.naming.NameServer;
+import org.apache.reef.io.network.naming.NameServerImpl;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.services.network.util.Monitor;
+import org.apache.reef.services.network.util.StringCodec;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.NetUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.Assert;
+import java.net.InetSocketAddress;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Network service test
+ */
+public class NetworkServiceTest {
+  private static final Logger LOG = Logger.getLogger(NetworkServiceTest.class.getName());
+
+  @Rule
+  public TestName name = new TestName();
+
+  /**
+   * NetworkService messaging test
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testMessagingNetworkService() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+
+    IdentifierFactory factory = new StringIdentifierFactory();
+    String nameServerAddr = NetUtils.getLocalAddress();
+
+    NameServer server = new NameServerImpl(0, factory);
+    int nameServerPort = server.getPort();
+
+    final int numMessages = 10;
+    final Monitor monitor = new Monitor();
+
+    LOG.log(Level.FINEST, "=== Test network service receiver start");
+    // network service
+    final String name2 = "task2";
+    NetworkService<String> ns2 = new NetworkService<String>(
+        factory, 0, nameServerAddr, nameServerPort,
+        new StringCodec(), new MessagingTransportFactory(),
+        new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler());
+    ns2.registerId(factory.getNewInstance(name2));
+    final int port2 = ns2.getTransport().getListeningPort();
+    server.register(factory.getNewInstance("task2"), new InetSocketAddress(nameServerAddr, port2));
+
+    LOG.log(Level.FINEST, "=== Test network service sender start");
+    final String name1 = "task1";
+    final NetworkService<String> ns1 = new NetworkService<String>(factory, 0, nameServerAddr, nameServerPort,
+        new StringCodec(), new MessagingTransportFactory(),
+        new MessageHandler<String>(name1, null, 0), new ExceptionHandler());
+    ns1.registerId(factory.getNewInstance(name1));
+    final int port1 = ns1.getTransport().getListeningPort();
+    server.register(factory.getNewInstance("task1"), new InetSocketAddress(nameServerAddr, port1));
+
+    final Identifier destId = factory.getNewInstance(name2);
+    final Connection<String> conn = ns1.newConnection(destId);
+    try {
+      conn.open();
+      for (int count = 0; count < numMessages; ++count) {
+        conn.write("hello! " + count);
+      }
+      monitor.mwait();
+
+    } catch (NetworkException e) {
+      e.printStackTrace();
+    }
+    conn.close();
+
+    ns1.close();
+    ns2.close();
+
+    server.close();
+  }
+
+  /**
+   * NetworkService messaging rate benchmark
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testMessagingNetworkServiceRate() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+
+    IdentifierFactory factory = new StringIdentifierFactory();
+    String nameServerAddr = NetUtils.getLocalAddress();
+
+    NameServer server = new NameServerImpl(0, factory);
+    int nameServerPort = server.getPort();
+
+    final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
+
+    for (int size : messageSizes) {
+      final int numMessages = 300000 / (Math.max(1, size / 512));
+      final Monitor monitor = new Monitor();
+
+      LOG.log(Level.FINEST, "=== Test network service receiver start");
+      // network service
+      final String name2 = "task2";
+      NetworkService<String> ns2 = new NetworkService<String>(
+          factory, 0, nameServerAddr, nameServerPort,
+          new StringCodec(), new MessagingTransportFactory(),
+          new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler());
+      ns2.registerId(factory.getNewInstance(name2));
+      final int port2 = ns2.getTransport().getListeningPort();
+      server.register(factory.getNewInstance("task2"), new InetSocketAddress(nameServerAddr, port2));
+
+      LOG.log(Level.FINEST, "=== Test network service sender start");
+      final String name1 = "task1";
+      NetworkService<String> ns1 = new NetworkService<String>(
+          factory, 0, nameServerAddr, nameServerPort,
+          new StringCodec(), new MessagingTransportFactory(),
+          new MessageHandler<String>(name1, null, 0), new ExceptionHandler());
+      ns1.registerId(factory.getNewInstance(name1));
+      final int port1 = ns1.getTransport().getListeningPort();
+      server.register(factory.getNewInstance("task1"), new InetSocketAddress(nameServerAddr, port1));
+
+      Identifier destId = factory.getNewInstance(name2);
+      Connection<String> conn = ns1.newConnection(destId);
+
+      // build the message
+      StringBuilder msb = new StringBuilder();
+      for (int i = 0; i < size; i++) {
+        msb.append("1");
+      }
+      String message = msb.toString();
+
+      long start = System.currentTimeMillis();
+      try {
+        for (int i = 0; i < numMessages; i++) {
+          conn.open();
+          conn.write(message);
+        }
+        monitor.mwait();
+      } catch (NetworkException e) {
+        e.printStackTrace();
+      }
+      long end = System.currentTimeMillis();
+      double runtime = ((double) end - start) / 1000;
+      LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + numMessages / runtime + " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / runtime);// x2 for unicode chars
+      conn.close();
+
+      ns1.close();
+      ns2.close();
+    }
+
+    server.close();
+  }
+
+  /**
+   * NetworkService messaging rate benchmark
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testMessagingNetworkServiceRateDisjoint() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+
+    final IdentifierFactory factory = new StringIdentifierFactory();
+    final String nameServerAddr = NetUtils.getLocalAddress();
+
+    final NameServer server = new NameServerImpl(0, factory);
+    final int nameServerPort = server.getPort();
+
+    BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>();
+
+    int numThreads = 4;
+    final int size = 2000;
+    final int numMessages = 300000 / (Math.max(1, size / 512));
+    final int totalNumMessages = numMessages * numThreads;
+
+    ExecutorService e = Executors.newCachedThreadPool();
+    for (int t = 0; t < numThreads; t++) {
+      final int tt = t;
+
+      e.submit(new Runnable() {
+        public void run() {
+          try {
+            Monitor monitor = new Monitor();
+
+            LOG.log(Level.FINEST, "=== Test network service receiver start");
+            // network service
+            final String name2 = "task2-" + tt;
+            NetworkService<String> ns2 = new NetworkService<String>(
+                factory, 0, nameServerAddr, nameServerPort,
+                new StringCodec(), new MessagingTransportFactory(),
+                new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler());
+            ns2.registerId(factory.getNewInstance(name2));
+            final int port2 = ns2.getTransport().getListeningPort();
+            server.register(factory.getNewInstance(name2), new InetSocketAddress(nameServerAddr, port2));
+
+            LOG.log(Level.FINEST, "=== Test network service sender start");
+            final String name1 = "task1-" + tt;
+            NetworkService<String> ns1 = new NetworkService<String>(
+                factory, 0, nameServerAddr, nameServerPort,
+                new StringCodec(), new MessagingTransportFactory(),
+                new MessageHandler<String>(name1, null, 0), new ExceptionHandler());
+            ns1.registerId(factory.getNewInstance(name1));
+            final int port1 = ns1.getTransport().getListeningPort();
+            server.register(factory.getNewInstance(name1), new InetSocketAddress(nameServerAddr, port1));
+
+            Identifier destId = factory.getNewInstance(name2);
+            Connection<String> conn = ns1.newConnection(destId);
+
+            // build the message
+            StringBuilder msb = new StringBuilder();
+            for (int i = 0; i < size; i++) {
+              msb.append("1");
+            }
+            String message = msb.toString();
+
+
+            try {
+              for (int i = 0; i < numMessages; i++) {
+                conn.open();
+                conn.write(message);
+              }
+              monitor.mwait();
+            } catch (NetworkException e) {
+              e.printStackTrace();
+            }
+            conn.close();
+
+            ns1.close();
+            ns2.close();
+          } catch (Exception e) {
+            e.printStackTrace();
+
+          }
+        }
+      });
+    }
+
+    // start and time
+    long start = System.currentTimeMillis();
+    Object ignore = new Object();
+    for (int i = 0; i < numThreads; i++) barrier.add(ignore);
+    e.shutdown();
+    e.awaitTermination(100, TimeUnit.SECONDS);
+    long end = System.currentTimeMillis();
+
+    double runtime = ((double) end - start) / 1000;
+    LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
+
+    server.close();
+  }
+
+  @Test
+  public void testMultithreadedSharedConnMessagingNetworkServiceRate() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+
+    IdentifierFactory factory = new StringIdentifierFactory();
+    String nameServerAddr = NetUtils.getLocalAddress();
+
+    NameServer server = new NameServerImpl(0, factory);
+    int nameServerPort = server.getPort();
+
+    final int[] messageSizes = {2000};// {1,16,32,64,512,64*1024,1024*1024};
+
+    for (int size : messageSizes) {
+      final int numMessages = 300000 / (Math.max(1, size / 512));
+      int numThreads = 2;
+      int totalNumMessages = numMessages * numThreads;
+      final Monitor monitor = new Monitor();
+
+      LOG.log(Level.FINEST, "=== Test network service receiver start");
+      // network service
+      final String name2 = "task2";
+      NetworkService<String> ns2 = new NetworkService<String>(
+          factory, 0, nameServerAddr, nameServerPort,
+          new StringCodec(), new MessagingTransportFactory(),
+          new MessageHandler<String>(name2, monitor, totalNumMessages), new ExceptionHandler());
+      ns2.registerId(factory.getNewInstance(name2));
+      final int port2 = ns2.getTransport().getListeningPort();
+      server.register(factory.getNewInstance("task2"), new InetSocketAddress(nameServerAddr, port2));
+
+      LOG.log(Level.FINEST, "=== Test network service sender start");
+      final String name1 = "task1";
+      NetworkService<String> ns1 = new NetworkService<String>(
+          factory, 0, nameServerAddr, nameServerPort,
+          new StringCodec(), new MessagingTransportFactory(),
+          new MessageHandler<String>(name1, null, 0), new ExceptionHandler());
+      ns1.registerId(factory.getNewInstance(name1));
+      final int port1 = ns1.getTransport().getListeningPort();
+      server.register(factory.getNewInstance("task1"), new InetSocketAddress(nameServerAddr, port1));
+
+      Identifier destId = factory.getNewInstance(name2);
+      final Connection<String> conn = ns1.newConnection(destId);
+      conn.open();
+
+      // build the message
+      StringBuilder msb = new StringBuilder();
+      for (int i = 0; i < size; i++) {
+        msb.append("1");
+      }
+      final String message = msb.toString();
+
+      ExecutorService e = Executors.newCachedThreadPool();
+
+      long start = System.currentTimeMillis();
+      for (int i = 0; i < numThreads; i++) {
+        e.submit(new Runnable() {
+
+          @Override
+          public void run() {
+            try {
+              for (int i = 0; i < numMessages; i++) {
+                conn.write(message);
+              }
+            } catch (NetworkException e) {
+              e.printStackTrace();
+            }
+          }
+        });
+      }
+
+
+      e.shutdown();
+      e.awaitTermination(30, TimeUnit.SECONDS);
+      monitor.mwait();
+
+      long end = System.currentTimeMillis();
+      double runtime = ((double) end - start) / 1000;
+
+      LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
+      conn.close();
+
+      ns1.close();
+      ns2.close();
+    }
+
+    server.close();
+  }
+
+  /**
+   * NetworkService messaging rate benchmark
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testMessagingNetworkServiceBatchingRate() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+
+    IdentifierFactory factory = new StringIdentifierFactory();
+    String nameServerAddr = NetUtils.getLocalAddress();
+
+    NameServer server = new NameServerImpl(0, factory);
+    int nameServerPort = server.getPort();
+
+    final int batchSize = 1024 * 1024;
+    final int[] messageSizes = {32, 64, 512};
+
+    for (int size : messageSizes) {
+      final int numMessages = 300 / (Math.max(1, size / 512));
+      final Monitor monitor = new Monitor();
+
+      LOG.log(Level.FINEST, "=== Test network service receiver start");
+      // network service
+      final String name2 = "task2";
+      NetworkService<String> ns2 = new NetworkService<String>(
+          factory, 0, nameServerAddr, nameServerPort,
+          new StringCodec(), new MessagingTransportFactory(),
+          new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler());
+      ns2.registerId(factory.getNewInstance(name2));
+      final int port2 = ns2.getTransport().getListeningPort();
+      server.register(factory.getNewInstance("task2"), new InetSocketAddress(nameServerAddr, port2));
+
+      LOG.log(Level.FINEST, "=== Test network service sender start");
+      final String name1 = "task1";
+      NetworkService<String> ns1 = new NetworkService<String>(
+          factory, 0, nameServerAddr, nameServerPort,
+          new StringCodec(), new MessagingTransportFactory(),
+          new MessageHandler<String>(name1, null, 0), new ExceptionHandler());
+      ns1.registerId(factory.getNewInstance(name1));
+      final int port1 = ns1.getTransport().getListeningPort();
+      server.register(factory.getNewInstance("task1"), new InetSocketAddress(nameServerAddr, port1));
+
+      Identifier destId = factory.getNewInstance(name2);
+      Connection<String> conn = ns1.newConnection(destId);
+
+      // build the message
+      StringBuilder msb = new StringBuilder();
+      for (int i = 0; i < size; i++) {
+        msb.append("1");
+      }
+      String message = msb.toString();
+
+      long start = System.currentTimeMillis();
+      try {
+        for (int i = 0; i < numMessages; i++) {
+          StringBuilder sb = new StringBuilder();
+          for (int j = 0; j < batchSize / size; j++) {
+            sb.append(message);
+          }
+          conn.open();
+          conn.write(sb.toString());
+        }
+        monitor.mwait();
+      } catch (NetworkException e) {
+        e.printStackTrace();
+      }
+      long end = System.currentTimeMillis();
+      double runtime = ((double) end - start) / 1000;
+      long numAppMessages = numMessages * batchSize / size;
+      LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + numAppMessages / runtime + " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / runtime);// x2 for unicode chars
+      conn.close();
+
+      ns1.close();
+      ns2.close();
+    }
+
+    server.close();
+  }
+
+  /**
+   * Test message handler
+   *
+   * @param <T> type
+   */
+  class MessageHandler<T> implements EventHandler<Message<T>> {
+
+    private final String name;
+    private final int expected;
+    private final Monitor monitor;
+    private AtomicInteger count = new AtomicInteger(0);
+
+    MessageHandler(String name, Monitor monitor, int expected) {
+      this.name = name;
+      this.monitor = monitor;
+      this.expected = expected;
+    }
+
+    @Override
+    public void onNext(Message<T> value) {
+      count.incrementAndGet();
+
+      //System.out.print(name + " received " + value.getData() + " from " + value.getSrcId() + " to " + value.getDestId());
+      for (T obj : value.getData()) {
+        // System.out.print(" data: " + obj);
+      }
+      //LOG.log(Level.FINEST, );
+      if (count.get() == expected) {
+        monitor.mnotify();
+      }
+    }
+  }
+
+  /**
+   * Test exception handler
+   */
+  class ExceptionHandler implements EventHandler<Exception> {
+    @Override
+    public void onNext(Exception error) {
+      System.err.println(error);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/network/TestEvent.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/TestEvent.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/TestEvent.java
new file mode 100644
index 0000000..c325ab9
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/TestEvent.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network;
+
+import java.io.Serializable;
+
+/**
+ * Event for testing
+ */
+public class TestEvent implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private String message;
+
+  public TestEvent(String message) {
+    this.message = message;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/LoggingUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/LoggingUtils.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/LoggingUtils.java
new file mode 100644
index 0000000..0d96670
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/LoggingUtils.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network.util;
+
+import java.util.logging.ConsoleHandler;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class LoggingUtils {
+  public static void setLoggingLevel(Level level) {
+    Handler[] handlers = Logger.getLogger("").getHandlers();
+    ConsoleHandler ch = null;
+    for (Handler h : handlers) {
+      if (h instanceof ConsoleHandler) {
+        ch = (ConsoleHandler) h;
+        break;
+      }
+    }
+    if (ch == null) {
+      ch = new ConsoleHandler();
+      Logger.getLogger("").addHandler(ch);
+    }
+    ch.setLevel(level);
+    Logger.getLogger("").setLevel(level);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/Monitor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/Monitor.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/Monitor.java
new file mode 100644
index 0000000..ccfb54c
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/Monitor.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network.util;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class Monitor {
+  private AtomicBoolean finished = new AtomicBoolean(false);
+
+  public void mwait() throws InterruptedException {
+    synchronized (this) {
+      while (!finished.get())
+        this.wait();
+    }
+  }
+
+  public void mnotify() {
+    synchronized (this) {
+      finished.compareAndSet(false, true);
+      this.notifyAll();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StringCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StringCodec.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StringCodec.java
new file mode 100644
index 0000000..9c55976
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StringCodec.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network.util;
+
+import org.apache.reef.wake.remote.Codec;
+
+
+public class StringCodec implements Codec<String> {
+  @Override
+  public byte[] encode(String obj) {
+    return obj.getBytes();
+  }
+
+  @Override
+  public String decode(byte[] buf) {
+    return new String(buf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/TimeoutHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/TimeoutHandler.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/TimeoutHandler.java
new file mode 100644
index 0000000..575ed33
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/TimeoutHandler.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network.util;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.PeriodicEvent;
+
+public class TimeoutHandler implements EventHandler<PeriodicEvent> {
+
+  private final Monitor monitor;
+
+  public TimeoutHandler(Monitor monitor) {
+    this.monitor = monitor;
+  }
+
+  @Override
+  public void onNext(PeriodicEvent event) {
+    monitor.mnotify();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/package-info.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/package-info.java
new file mode 100644
index 0000000..f40b8c4
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network.util;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/storage/ExternalMapTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/storage/ExternalMapTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/storage/ExternalMapTest.java
new file mode 100644
index 0000000..7451ac8
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/storage/ExternalMapTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.storage;
+
+import org.apache.reef.io.ExternalMap;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.io.storage.ram.CodecRamMap;
+import org.apache.reef.io.storage.ram.RamMap;
+import org.apache.reef.io.storage.ram.RamStorageService;
+import org.apache.reef.io.storage.util.IntegerCodec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+
+public class ExternalMapTest {
+  @Test
+  public void testCodecRamMap() {
+    RamStorageService ramStore = new RamStorageService();
+    Codec<Integer> c = new IntegerCodec();
+    ExternalMap<Integer> m = new CodecRamMap<>(ramStore, c);
+    genericTest(m);
+  }
+
+  @Test
+  public void testRamMap() {
+    RamStorageService ramStore = new RamStorageService();
+    ExternalMap<Integer> m = new RamMap<>(ramStore);
+    genericTest(m);
+  }
+
+
+  void genericTest(ExternalMap<Integer> m) {
+    m.put("foo", 42);
+    Map<String, Integer> smallMap = new HashMap<>();
+    smallMap.put("bar", 43);
+    smallMap.put("baz", 44);
+
+    m.putAll(smallMap);
+
+    Assert.assertEquals(44, (int) m.get("baz"));
+    Assert.assertEquals(43, (int) m.get("bar"));
+    Assert.assertEquals(42, (int) m.get("foo"));
+    Assert.assertNull(m.get("quuz"));
+
+    Assert.assertTrue(m.containsKey("bar"));
+    Assert.assertFalse(m.containsKey("quuz"));
+
+    Set<String> barBaz = new HashSet<>();
+    barBaz.add("bar");
+    barBaz.add("baz");
+    barBaz.add("quuz");
+
+    Iterable<Map.Entry<CharSequence, Integer>> it = m.getAll(barBaz);
+
+    Map<CharSequence, Integer> found = new TreeMap<>();
+
+    for (Map.Entry<CharSequence, Integer> e : it) {
+      found.put(e.getKey(), e.getValue());
+    }
+    Iterator<CharSequence> it2 = found.keySet().iterator();
+    Assert.assertTrue(it2.hasNext());
+    CharSequence s = it2.next();
+    Assert.assertEquals(s, "bar");
+    Assert.assertEquals((int) found.get(s), 43);
+    Assert.assertTrue(it2.hasNext());
+    s = it2.next();
+    Assert.assertEquals(s, "baz");
+    Assert.assertEquals((int) found.get(s), 44);
+    Assert.assertFalse(it2.hasNext());
+
+    Assert.assertEquals(44, (int) m.remove("baz"));
+    Assert.assertFalse(m.containsKey("baz"));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/storage/FramingTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/storage/FramingTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/storage/FramingTest.java
new file mode 100644
index 0000000..55857f9
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/storage/FramingTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.storage;
+
+import org.apache.reef.exception.evaluator.ServiceException;
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.storage.FramingInputStream;
+import org.apache.reef.io.storage.FramingOutputStream;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+public class FramingTest {
+
+  @Test
+  public void frameRoundTripTest() throws IOException, ServiceException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    ByteArrayOutputStream baos2 = new ByteArrayOutputStream();
+    FramingOutputStream o = new FramingOutputStream(baos);
+    FramingOutputStream o2 = new FramingOutputStream(baos2);
+    Accumulator<byte[]> a = o2.accumulator();
+    int offset = 0;
+    for (int i = 0; i < 256; i++) {
+      byte[] b = new byte[i];
+      Arrays.fill(b, (byte) i);
+      o.write(b);
+      if (i == 255) {
+        o.close();
+      } else {
+        o.nextFrame();
+      }
+      offset += (4 + i);
+      Assert.assertEquals(offset, o.getCurrentOffset());
+      a.add(b);
+      Assert.assertEquals(offset, o2.getCurrentOffset());
+    }
+    a.close();
+    o2.close();
+    byte[] b1 = baos.toByteArray();
+    byte[] b2 = baos2.toByteArray();
+    Assert.assertArrayEquals(b1, b2);
+    FramingInputStream inA1 = new FramingInputStream(new ByteArrayInputStream(b1));
+    FramingInputStream inA2 = new FramingInputStream(new ByteArrayInputStream(b2));
+    for (int i = 0; i <= 256; i++) {
+      byte[] b = new byte[i];
+      Arrays.fill(b, (byte) i);
+      byte[] f = inA1.readFrame();
+      byte[] g = inA2.readFrame();
+      if (i == 256) {
+        Assert.assertNull(f);
+        Assert.assertNull(g);
+      } else {
+        Assert.assertArrayEquals(b, f);
+        Assert.assertArrayEquals(b, g);
+      }
+    }
+    inA2.close();
+    inA1.close();
+
+    FramingInputStream inB1 = new FramingInputStream(new ByteArrayInputStream(b1));
+    int i = 0;
+    for (byte[] bin : inB1) {
+      byte[] b = new byte[i];
+      Arrays.fill(b, (byte) i);
+      Assert.assertArrayEquals(b, bin);
+      i++;
+    }
+    Assert.assertEquals(256, i);
+    inB1.close();
+
+    FramingInputStream inB2 = new FramingInputStream(new ByteArrayInputStream(b2));
+    i = 0;
+    for (byte[] bin : inB2) {
+      byte[] b = new byte[i];
+      Arrays.fill(b, (byte) i);
+      Assert.assertArrayEquals(b, bin);
+      i++;
+    }
+    Assert.assertEquals(256, i);
+    inB2.close();
+    Assert.assertArrayEquals(b1, b2);
+  }
+
+}