You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:46:59 UTC

[26/51] [partial] incubator-reef git commit: [REEF-93] Move java sources to lang/java

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/package-info.java
new file mode 100644
index 0000000..f16facd
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/AvroUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/AvroUtils.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/AvroUtils.java
new file mode 100644
index 0000000..73a1cd4
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/AvroUtils.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.avro.io.*;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+/**
+ * Utilities for AVRO.
+ */
+final class AvroUtils {
+
+  private AvroUtils() {
+  }
+
+  /**
+   * Serializes the given avro object to a byte[]
+   *
+   * @param avroObject
+   * @param theClass
+   * @param <T>
+   * @return
+   */
+  static final <T> byte[] toBytes(T avroObject, Class<T> theClass) {
+    final DatumWriter<T> datumWriter = new SpecificDatumWriter<>(theClass);
+    final byte[] theBytes;
+    try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+      final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+      datumWriter.write(avroObject, encoder);
+      encoder.flush();
+      out.flush();
+      theBytes = out.toByteArray();
+    } catch (final IOException e) {
+      throw new RuntimeException("Unable to serialize an avro object", e);
+    }
+    return theBytes;
+  }
+
+  static <T> T fromBytes(final byte[] theBytes, final Class<T> theClass) {
+    final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(theBytes, null);
+    final SpecificDatumReader<T> reader = new SpecificDatumReader<>(theClass);
+    try {
+      return reader.read(null, decoder);
+    } catch (final IOException e) {
+      throw new RuntimeException("Failed to deserialize an avro object", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupRequest.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupRequest.java
new file mode 100644
index 0000000..eda5f15
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupRequest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.reef.wake.Identifier;
+
+/**
+ * Naming lookup request
+ */
+public class NamingLookupRequest extends NamingMessage {
+  private Iterable<Identifier> ids;
+
+  /**
+   * Constructs a naming lookup request
+   *
+   * @param ids the iterable of identifiers
+   */
+  public NamingLookupRequest(Iterable<Identifier> ids) {
+    this.ids = ids;
+  }
+
+  /**
+   * Gets identifiers
+   *
+   * @return an iterable of identifiers
+   */
+  public Iterable<Identifier> getIdentifiers() {
+    return ids;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupRequestCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupRequestCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupRequestCodec.java
new file mode 100644
index 0000000..b388ba6
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupRequestCodec.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.reef.io.network.naming.avro.AvroNamingLookupRequest;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Naming lookup request codec
+ */
+public final class NamingLookupRequestCodec implements Codec<NamingLookupRequest> {
+
+  private final IdentifierFactory factory;
+
+  /**
+   * Constructs a naming lookup request codec
+   *
+   * @param factory the identifier factory
+   */
+  @Inject
+  public NamingLookupRequestCodec(final IdentifierFactory factory) {
+    this.factory = factory;
+  }
+
+  /**
+   * Encodes the identifiers to bytes
+   *
+   * @param obj the naming lookup request
+   * @return a byte array
+   */
+  @Override
+  public byte[] encode(final NamingLookupRequest obj) {
+    final List<CharSequence> ids = new ArrayList<>();
+    for (final Identifier id : obj.getIdentifiers()) {
+      ids.add(id.toString());
+    }
+    return AvroUtils.toBytes(AvroNamingLookupRequest.newBuilder().setIds(ids).build(), AvroNamingLookupRequest.class);
+  }
+
+  /**
+   * Decodes the bytes to a naming lookup request
+   *
+   * @param buf the byte array
+   * @return a naming lookup request
+   */
+  @Override
+  public NamingLookupRequest decode(final byte[] buf) {
+    final AvroNamingLookupRequest req = AvroUtils.fromBytes(buf, AvroNamingLookupRequest.class);
+
+    final List<Identifier> ids = new ArrayList<Identifier>(req.getIds().size());
+    for (final CharSequence s : req.getIds()) {
+      ids.add(factory.getNewInstance(s.toString()));
+    }
+    return new NamingLookupRequest(ids);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupResponse.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupResponse.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupResponse.java
new file mode 100644
index 0000000..822f170
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupResponse.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.reef.io.naming.NameAssignment;
+
+import java.util.List;
+
+/**
+ * Naming lookup response
+ */
+public class NamingLookupResponse extends NamingMessage {
+  private final List<NameAssignment> nas;
+
+  /**
+   * Constructs a naming lookup response
+   *
+   * @param nas the list of name assignments
+   */
+  public NamingLookupResponse(List<NameAssignment> nas) {
+    this.nas = nas;
+  }
+
+  /**
+   * Gets name assignments
+   *
+   * @return a list of name assignments
+   */
+  public List<NameAssignment> getNameAssignments() {
+    return nas;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupResponseCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupResponseCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupResponseCodec.java
new file mode 100644
index 0000000..affe3b6
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupResponseCodec.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.reef.io.naming.NameAssignment;
+import org.apache.reef.io.network.naming.NameAssignmentTuple;
+import org.apache.reef.io.network.naming.avro.AvroNamingAssignment;
+import org.apache.reef.io.network.naming.avro.AvroNamingLookupResponse;
+import org.apache.reef.io.network.naming.exception.NamingRuntimeException;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+
+import javax.inject.Inject;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Naming lookup response codec
+ */
+public final class NamingLookupResponseCodec implements Codec<NamingLookupResponse> {
+
+  private final IdentifierFactory factory;
+
+  /**
+   * Constructs a naming lookup response codec
+   *
+   * @param factory the identifier factory
+   */
+  @Inject
+  public NamingLookupResponseCodec(final IdentifierFactory factory) {
+    this.factory = factory;
+  }
+
+  /**
+   * Encodes name assignments to bytes
+   *
+   * @param obj the naming lookup response
+   * @return a byte array
+   */
+  @Override
+  public byte[] encode(NamingLookupResponse obj) {
+    final List<AvroNamingAssignment> assignments = new ArrayList<>(obj.getNameAssignments().size());
+    for (final NameAssignment nameAssignment : obj.getNameAssignments()) {
+      assignments.add(AvroNamingAssignment.newBuilder()
+          .setId(nameAssignment.getIdentifier().toString())
+          .setHost(nameAssignment.getAddress().getHostName())
+          .setPort(nameAssignment.getAddress().getPort())
+          .build());
+    }
+    return AvroUtils.toBytes(
+        AvroNamingLookupResponse.newBuilder().setTuples(assignments).build(), AvroNamingLookupResponse.class
+    );
+  }
+
+  /**
+   * Decodes bytes to an iterable of name assignments
+   *
+   * @param buf the byte array
+   * @return a naming lookup response
+   * @throws NamingRuntimeException
+   */
+  @Override
+  public NamingLookupResponse decode(final byte[] buf) {
+    final AvroNamingLookupResponse avroResponse = AvroUtils.fromBytes(buf, AvroNamingLookupResponse.class);
+    final List<NameAssignment> nas = new ArrayList<NameAssignment>(avroResponse.getTuples().size());
+    for (final AvroNamingAssignment tuple : avroResponse.getTuples()) {
+      nas.add(
+          new NameAssignmentTuple(
+              factory.getNewInstance(tuple.getId().toString()),
+              new InetSocketAddress(tuple.getHost().toString(), tuple.getPort())
+          )
+      );
+    }
+    return new NamingLookupResponse(nas);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingMessage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingMessage.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingMessage.java
new file mode 100644
index 0000000..3152854
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingMessage.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.reef.wake.remote.transport.Link;
+
+/**
+ * An abstract base class for naming messages
+ */
+public abstract class NamingMessage {
+  private transient Link<byte[]> link;
+
+  /**
+   * Gets a link
+   *
+   * @return a link
+   */
+  public Link<byte[]> getLink() {
+    return link;
+  }
+
+  /**
+   * Sets the link
+   *
+   * @param link the link
+   */
+  public void setLink(Link<byte[]> link) {
+    this.link = link;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequest.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequest.java
new file mode 100644
index 0000000..e6e1e9c
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.reef.io.naming.NameAssignment;
+
+/**
+ * naming registration request
+ */
+public class NamingRegisterRequest extends NamingMessage {
+  private final NameAssignment na;
+
+  /**
+   * Constructs a naming registration request
+   *
+   * @param na the name assignment
+   */
+  public NamingRegisterRequest(NameAssignment na) {
+    this.na = na;
+  }
+
+  /**
+   * Gets a name assignment
+   *
+   * @return a name assignment
+   */
+  public NameAssignment getNameAssignment() {
+    return na;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequestCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequestCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequestCodec.java
new file mode 100644
index 0000000..c32d888
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequestCodec.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.reef.io.network.naming.NameAssignmentTuple;
+import org.apache.reef.io.network.naming.avro.AvroNamingRegisterRequest;
+import org.apache.reef.io.network.naming.exception.NamingRuntimeException;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Naming registration request codec
+ */
+public class NamingRegisterRequestCodec implements Codec<NamingRegisterRequest> {
+
+  private final IdentifierFactory factory;
+
+  /**
+   * Constructs a naming registration request codec
+   *
+   * @param factory the identifier factory
+   */
+  public NamingRegisterRequestCodec(IdentifierFactory factory) {
+    this.factory = factory;
+  }
+
+  /**
+   * Encodes the name assignment to bytes
+   *
+   * @param obj the naming registration request
+   * @return a byte array
+   */
+  @Override
+  public byte[] encode(NamingRegisterRequest obj) {
+    final AvroNamingRegisterRequest result = AvroNamingRegisterRequest.newBuilder()
+        .setId(obj.getNameAssignment().getIdentifier().toString())
+        .setHost(obj.getNameAssignment().getAddress().getHostName())
+        .setPort(obj.getNameAssignment().getAddress().getPort())
+        .build();
+    return AvroUtils.toBytes(result, AvroNamingRegisterRequest.class);
+  }
+
+  /**
+   * Decodes the bytes to a name assignment
+   *
+   * @param buf the byte array
+   * @return a naming registration request
+   * @throws NamingRuntimeException
+   */
+  @Override
+  public NamingRegisterRequest decode(byte[] buf) {
+    final AvroNamingRegisterRequest avroNamingRegisterRequest = AvroUtils.fromBytes(buf, AvroNamingRegisterRequest.class);
+    return new NamingRegisterRequest(
+        new NameAssignmentTuple(factory.getNewInstance(avroNamingRegisterRequest.getId().toString()),
+            new InetSocketAddress(avroNamingRegisterRequest.getHost().toString(), avroNamingRegisterRequest.getPort()))
+    );
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterResponse.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterResponse.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterResponse.java
new file mode 100644
index 0000000..b8c416f
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterResponse.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+/**
+ * naming registration response
+ */
+public class NamingRegisterResponse extends NamingMessage {
+  private final NamingRegisterRequest request;
+
+  /**
+   * Constructs a naming register response
+   *
+   * @param request the naming register request
+   */
+  public NamingRegisterResponse(NamingRegisterRequest request) {
+    this.request = request;
+  }
+
+  /**
+   * Gets a naming register request
+   *
+   * @return a naming register request
+   */
+  public NamingRegisterRequest getRequest() {
+    return request;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterResponseCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterResponseCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterResponseCodec.java
new file mode 100644
index 0000000..f95a90e
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterResponseCodec.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.reef.wake.remote.Codec;
+
+/**
+ * naming registration response codec
+ */
+public class NamingRegisterResponseCodec implements Codec<NamingRegisterResponse> {
+  private final NamingRegisterRequestCodec codec;
+
+  /**
+   * Constructs a naming register response codec
+   *
+   * @param codec the naming register request codec
+   */
+  public NamingRegisterResponseCodec(NamingRegisterRequestCodec codec) {
+    this.codec = codec;
+  }
+
+  /**
+   * Encodes a naming register response to bytes
+   *
+   * @param obj the naming register response
+   * @return bytes a byte array
+   */
+  @Override
+  public byte[] encode(NamingRegisterResponse obj) {
+    return codec.encode(obj.getRequest());
+  }
+
+  /**
+   * Decodes a naming register response from the bytes
+   *
+   * @param buf the byte array
+   * @return a naming register response
+   */
+  @Override
+  public NamingRegisterResponse decode(byte[] buf) {
+    return new NamingRegisterResponse(codec.decode(buf));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingUnregisterRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingUnregisterRequest.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingUnregisterRequest.java
new file mode 100644
index 0000000..770b7c9
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingUnregisterRequest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.reef.wake.Identifier;
+
+/**
+ * Naming un-registration request
+ */
+public class NamingUnregisterRequest extends NamingMessage {
+  private final Identifier id;
+
+  /**
+   * Constructs a naming un-registration request
+   *
+   * @param id the identifier
+   */
+  public NamingUnregisterRequest(Identifier id) {
+    this.id = id;
+  }
+
+  /**
+   * Gets an identifier
+   *
+   * @return an identifier
+   */
+  public Identifier getIdentifier() {
+    return id;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingUnregisterRequestCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingUnregisterRequestCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingUnregisterRequestCodec.java
new file mode 100644
index 0000000..ea505c3
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingUnregisterRequestCodec.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.reef.io.network.naming.avro.AvroNamingUnRegisterRequest;
+import org.apache.reef.io.network.naming.exception.NamingRuntimeException;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+
+import javax.inject.Inject;
+
+/**
+ * Naming un-registration request codec
+ */
+public final class NamingUnregisterRequestCodec implements Codec<NamingUnregisterRequest> {
+
+  private final IdentifierFactory factory;
+
+  /**
+   * Constructs a naming un-registration request codec
+   *
+   * @param factory the identifier factory
+   */
+  @Inject
+  public NamingUnregisterRequestCodec(final IdentifierFactory factory) {
+    this.factory = factory;
+  }
+
+  /**
+   * Encodes the naming un-registration request to bytes
+   *
+   * @param obj the naming un-registration request
+   * @return a byte array
+   */
+  @Override
+  public byte[] encode(NamingUnregisterRequest obj) {
+    final AvroNamingUnRegisterRequest result = AvroNamingUnRegisterRequest.newBuilder()
+        .setId(obj.getIdentifier().toString())
+        .build();
+    return AvroUtils.toBytes(result, AvroNamingUnRegisterRequest.class);
+  }
+
+  /**
+   * Decodes the bytes to a naming un-registration request
+   *
+   * @param buf the byte array
+   * @return a naming un-registration request
+   * @throws NamingRuntimeException
+   */
+  @Override
+  public NamingUnregisterRequest decode(byte[] buf) {
+    final AvroNamingUnRegisterRequest result = AvroUtils.fromBytes(buf, AvroNamingUnRegisterRequest.class);
+    return new NamingUnregisterRequest(factory.getNewInstance(result.getId().toString()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/package-info.java
new file mode 100644
index 0000000..d4ff9d3
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Contains naming serialization codecs
+ */
+package org.apache.reef.io.network.naming.serialization;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/package-info.java
new file mode 100644
index 0000000..4c53530
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/ListCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/ListCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/ListCodec.java
new file mode 100644
index 0000000..5e68871
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/ListCodec.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
+
+import org.apache.reef.wake.remote.Codec;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public final class ListCodec<T> implements Codec<List<T>> {
+
+  private static final Logger LOG = Logger.getLogger(ListCodec.class.getName());
+
+  private final Codec<T> codec;
+
+  public ListCodec(final Codec<T> codec) {
+    super();
+    this.codec = codec;
+  }
+
+  public static void main(final String[] args) {
+    final List<String> arrList = Arrays.asList(
+        new String[]{"One", "Two", "Three", "Four", "Five"});
+    LOG.log(Level.FINEST, "Input: {0}", arrList);
+    final ListCodec<String> lstCodec = new ListCodec<>(new StringCodec());
+    final byte[] bytes = lstCodec.encode(arrList);
+    LOG.log(Level.FINEST, "Output: {0}", lstCodec.decode(bytes));
+  }
+
+  @Override
+  public byte[] encode(final List<T> list) {
+    try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+         final DataOutputStream daos = new DataOutputStream(baos)) {
+      for (final T t : list) {
+        final byte[] tBytes = this.codec.encode(t);
+        daos.writeInt(tBytes.length);
+        daos.write(tBytes);
+      }
+      return baos.toByteArray();
+    } catch (final IOException ex) {
+      LOG.log(Level.WARNING, "Error in list encoding", ex);
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  public List<T> decode(final byte[] list) {
+    final List<T> result = new ArrayList<>();
+    try (final DataInputStream dais = new DataInputStream(new ByteArrayInputStream(list))) {
+      while (dais.available() > 0) {
+        final int length = dais.readInt();
+        final byte[] tBytes = new byte[length];
+        dais.readFully(tBytes);
+        final T t = this.codec.decode(tBytes);
+        result.add(t);
+      }
+      return result;
+    } catch (final IOException ex) {
+      LOG.log(Level.WARNING, "Error in list decoding", ex);
+      throw new RuntimeException(ex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/Pair.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/Pair.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/Pair.java
new file mode 100644
index 0000000..f1a3a07
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/Pair.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
+
+import java.io.Serializable;
+
+public final class Pair<T1, T2> implements Serializable {
+
+  public final T1 first;
+  public final T2 second;
+
+  private String pairStr = null;
+
+  public Pair(final T1 first, final T2 second) {
+    this.first = first;
+    this.second = second;
+  }
+
+  @Override
+  public String toString() {
+    if (this.pairStr == null) {
+      this.pairStr = "(" + this.first + "," + this.second + ")";
+    }
+    return this.pairStr;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringCodec.java
new file mode 100644
index 0000000..e4013bf
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringCodec.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
+
+import org.apache.reef.wake.remote.Codec;
+
+import javax.inject.Inject;
+
+
+public class StringCodec implements Codec<String> {
+
+  @Inject
+  public StringCodec() {
+    // Intentionally blank
+  }
+
+  @Override
+  public byte[] encode(String obj) {
+    return obj.getBytes();
+  }
+
+  @Override
+  public String decode(byte[] buf) {
+    return new String(buf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringIdentifier.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringIdentifier.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringIdentifier.java
new file mode 100644
index 0000000..5d6454b
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringIdentifier.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
+
+import org.apache.reef.wake.ComparableIdentifier;
+import org.apache.reef.wake.Identifier;
+
+/**
+ * String identifier
+ */
+public class StringIdentifier implements ComparableIdentifier {
+
+  private final String str;
+
+  /**
+   * Constructs a string identifier
+   *
+   * @param str a string
+   */
+  StringIdentifier(String str) {
+    this.str = str;
+  }
+
+  /**
+   * Returns a hash code for the object
+   *
+   * @return a hash code value for this object
+   */
+  public int hashCode() {
+    return str.hashCode();
+  }
+
+  /**
+   * Checks that another object is equal to this object
+   *
+   * @param o another object
+   * @return true if the object is the same as the object argument; false, otherwise
+   */
+  public boolean equals(Object o) {
+    return str.equals(((StringIdentifier) o).toString());
+  }
+
+  /**
+   * Returns a string representation of the object
+   *
+   * @return a string representation of the object
+   */
+  public String toString() {
+    return str;
+  }
+
+  @Override
+  public int compareTo(Identifier o) {
+    if (o == null) {
+      if (str == null)
+        return 0;
+      return 1;
+    } else {
+      if (str == null)
+        return -1;
+      return str.compareTo(o.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringIdentifierFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringIdentifierFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringIdentifierFactory.java
new file mode 100644
index 0000000..9886a01
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringIdentifierFactory.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
+
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+
+import javax.inject.Inject;
+
+/**
+ * Factory that creates StringIdentifier
+ */
+public class StringIdentifierFactory implements IdentifierFactory {
+
+  @Inject
+  public StringIdentifierFactory() {
+  }
+
+  /**
+   * Creates a StringIdentifier object
+   *
+   * @param s a string
+   * @return a string identifier
+   */
+  @Override
+  public Identifier getNewInstance(String s) {
+    return new StringIdentifier(s);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/package-info.java
new file mode 100644
index 0000000..4335f8f
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingInputStream.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingInputStream.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingInputStream.java
new file mode 100644
index 0000000..a40222b
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingInputStream.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage;
+
+import org.apache.reef.exception.evaluator.ServiceRuntimeException;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+
+public class FramingInputStream extends DataInputStream implements Iterable<byte[]> {
+
+  public FramingInputStream(InputStream in) {
+    super(in);
+  }
+
+  public byte[] readFrame() throws IOException {
+    int i = readInt();
+    if (i == -1) {
+      return null;
+    }
+    byte[] b = new byte[i];
+    readFully(b);
+    return b;
+  }
+
+  @Override
+  public Iterator<byte[]> iterator() {
+    try {
+      return new Iterator<byte[]>() {
+        byte[] cur = readFrame();
+
+        @Override
+        public boolean hasNext() {
+          return cur != null;
+        }
+
+        @Override
+        public byte[] next() {
+          byte[] ret = cur;
+          try {
+            cur = readFrame();
+          } catch (IOException e) {
+            throw new ServiceRuntimeException(e);
+          }
+          return ret;
+        }
+
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException();
+        }
+      };
+    } catch (IOException e) {
+      throw new ServiceRuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingOutputStream.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingOutputStream.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingOutputStream.java
new file mode 100644
index 0000000..11547a6
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingOutputStream.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage;
+
+import org.apache.reef.exception.evaluator.ServiceException;
+import org.apache.reef.exception.evaluator.StorageException;
+import org.apache.reef.io.Accumulable;
+import org.apache.reef.io.Accumulator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class FramingOutputStream extends OutputStream implements Accumulable<byte[]> {
+
+  private final ByteArrayOutputStream baos;
+  private final DataOutputStream o;
+  private long offset;
+  private boolean closed;
+
+  public FramingOutputStream(OutputStream o) {
+    if (!(o instanceof DataOutputStream)) {
+      this.o = new DataOutputStream(o);
+    } else {
+      this.o = (DataOutputStream) o;
+    }
+    this.baos = new ByteArrayOutputStream();
+  }
+
+  public void nextFrame() throws StorageException {
+    try {
+      o.writeInt(baos.size());
+      offset += 4;
+      baos.writeTo(o);
+      baos.reset();
+    } catch (IOException e) {
+      throw new StorageException(e);
+    }
+  }
+
+  public long getCurrentOffset() {
+    return offset;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    baos.write(b);
+    offset++;
+    ;
+  }
+
+  @Override
+  public void write(byte[] b) throws IOException {
+    baos.write(b);
+    offset += b.length;
+  }
+
+  @Override
+  public void write(byte[] b, int offset, int length) throws IOException {
+    baos.write(b, offset, length);
+    offset += length;
+  }
+
+  @Override
+  public void flush() {
+    // no-op.
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!closed) {
+      try {
+        if (this.offset > 0) nextFrame();
+      } catch (StorageException e) {
+        throw (IOException) e.getCause();
+      }
+      o.writeInt(-1);
+      o.close();
+      closed = true;
+    }
+  }
+
+  @Override
+  public Accumulator<byte[]> accumulator() throws StorageException {
+    @SuppressWarnings("resource")
+    final FramingOutputStream fos = this;
+    return new Accumulator<byte[]>() {
+
+      @Override
+      public void add(byte[] datum) throws ServiceException {
+        try {
+          o.writeInt(datum.length);
+          offset += 4;
+          o.write(datum);
+          offset += datum.length;
+        } catch (IOException e) {
+          throw new ServiceException(e);
+        }
+
+      }
+
+      @Override
+      public void close() throws ServiceException {
+        try {
+          o.writeInt(-1);
+          offset += 4;
+          o.close();
+          fos.closed = true;
+        } catch (IOException e) {
+          throw new ServiceException(e);
+        }
+      }
+
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingTupleDeserializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingTupleDeserializer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingTupleDeserializer.java
new file mode 100644
index 0000000..80affdf
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingTupleDeserializer.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage;
+
+import org.apache.reef.exception.evaluator.ServiceException;
+import org.apache.reef.exception.evaluator.ServiceRuntimeException;
+import org.apache.reef.io.Tuple;
+import org.apache.reef.io.serialization.Deserializer;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class FramingTupleDeserializer<K, V> implements
+    Deserializer<Tuple<K, V>, InputStream> {
+
+  private final Deserializer<K, InputStream> keyDeserializer;
+  private final Deserializer<V, InputStream> valDeserializer;
+
+  public FramingTupleDeserializer(Deserializer<K, InputStream> keyDeserializer,
+                                  Deserializer<V, InputStream> valDeserializer) {
+    this.keyDeserializer = keyDeserializer;
+    this.valDeserializer = valDeserializer;
+  }
+
+  @Override
+  public Iterable<Tuple<K, V>> create(InputStream ins) {
+    final DataInputStream in = new DataInputStream(ins);
+    final Iterable<K> keyItbl = keyDeserializer.create(in);
+    final Iterable<V> valItbl = valDeserializer.create(in);
+    return new Iterable<Tuple<K, V>>() {
+      @Override
+      public Iterator<Tuple<K, V>> iterator() {
+        final Iterator<K> keyIt = keyItbl.iterator();
+        final Iterator<V> valIt = valItbl.iterator();
+        try {
+          return new Iterator<Tuple<K, V>>() {
+
+            private int readFrameLength() throws ServiceException {
+              try {
+                return in.readInt();
+              } catch (IOException e) {
+                throw new ServiceRuntimeException(e);
+              }
+            }
+
+            int nextFrameLength = readFrameLength();
+
+            @Override
+            public boolean hasNext() {
+              return nextFrameLength != -1;
+            }
+
+            @Override
+            public Tuple<K, V> next() {
+              try {
+                if (nextFrameLength == -1) {
+                  throw new NoSuchElementException();
+                }
+                K k = keyIt.next();
+                readFrameLength();
+                V v = valIt.next();
+                nextFrameLength = readFrameLength();
+                return new Tuple<>(k, v);
+              } catch (ServiceException e) {
+                throw new ServiceRuntimeException(e);
+              }
+            }
+
+            @Override
+            public void remove() {
+              throw new UnsupportedOperationException();
+            }
+          };
+        } catch (ServiceException e) {
+          throw new ServiceRuntimeException(e);
+        }
+      }
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingTupleSerializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingTupleSerializer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingTupleSerializer.java
new file mode 100644
index 0000000..a98babd
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingTupleSerializer.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage;
+
+import org.apache.reef.exception.evaluator.ServiceException;
+import org.apache.reef.exception.evaluator.StorageException;
+import org.apache.reef.io.Accumulable;
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.Tuple;
+import org.apache.reef.io.serialization.Serializer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class FramingTupleSerializer<K, V> implements
+    Serializer<Tuple<K, V>, OutputStream> {
+
+  private final Serializer<K, OutputStream> keySerializer;
+  private final Serializer<V, OutputStream> valSerializer;
+
+  public FramingTupleSerializer(
+      final Serializer<K, OutputStream> keySerializer,
+      final Serializer<V, OutputStream> valSerializer) {
+    this.keySerializer = keySerializer;
+    this.valSerializer = valSerializer;
+  }
+
+  @Override
+  public Accumulable<Tuple<K, V>> create(final OutputStream os) {
+    final FramingOutputStream faos = new FramingOutputStream(os);
+
+    return new Accumulable<Tuple<K, V>>() {
+
+      @Override
+      public Accumulator<Tuple<K, V>> accumulator() throws ServiceException {
+
+        final Accumulator<K> keyAccumulator = keySerializer.create(faos)
+            .accumulator();
+        final Accumulator<V> valAccumulator = valSerializer.create(faos)
+            .accumulator();
+        return new Accumulator<Tuple<K, V>>() {
+          boolean first = true;
+
+          @Override
+          public void add(Tuple<K, V> datum) throws ServiceException {
+            if (!first) {
+              faos.nextFrame();
+            }
+            first = false;
+            keyAccumulator.add(datum.getKey());
+            faos.nextFrame();
+            valAccumulator.add(datum.getValue());
+          }
+
+          @Override
+          public void close() throws ServiceException {
+            try {
+              keyAccumulator.close();
+              valAccumulator.close();
+              faos.close();
+            } catch (IOException e) {
+              throw new StorageException(e);
+            }
+          }
+        };
+      }
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/MergingIterator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/MergingIterator.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/MergingIterator.java
new file mode 100644
index 0000000..2dfcc18
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/MergingIterator.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage;
+
+import org.apache.reef.io.Tuple;
+import org.apache.reef.io.storage.util.TupleKeyComparator;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+public class MergingIterator<T> implements Iterator<T> {
+  private final PriorityQueue<Tuple<T, Iterator<T>>> heap;
+
+  public MergingIterator(final Comparator<T> c, Iterator<T>[] its) {
+    this.heap = new PriorityQueue<Tuple<T, Iterator<T>>>(11,
+        new TupleKeyComparator<T, Iterator<T>>(c));
+
+    for (Iterator<T> it : its) {
+      T b = it.hasNext() ? it.next() : null;
+      if (b != null) {
+        heap.add(new Tuple<>(b, it));
+      }
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    return heap.size() != 0;
+  }
+
+  @Override
+  public T next() {
+    Tuple<T, Iterator<T>> ret = heap.remove();
+    if (ret.getValue().hasNext()) {
+      heap.add(new Tuple<>(ret.getValue().next(), ret.getValue()));
+    }
+    return ret.getKey();
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException(
+        "Cannot remove entires from MergingIterator!");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ScratchSpace.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ScratchSpace.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ScratchSpace.java
new file mode 100644
index 0000000..0d7353a
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ScratchSpace.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage;
+
+public interface ScratchSpace {
+  long availableSpace();
+
+  long usedSpace();
+
+  void delete();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/StorageService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/StorageService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/StorageService.java
new file mode 100644
index 0000000..1002b76
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/StorageService.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage;
+
+public interface StorageService {
+
+  ScratchSpace getScratchSpace();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileAccumulable.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileAccumulable.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileAccumulable.java
new file mode 100644
index 0000000..c5413e6
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileAccumulable.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.local;
+
+import org.apache.reef.exception.evaluator.StorageException;
+import org.apache.reef.io.Accumulable;
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.serialization.Codec;
+
+import java.io.File;
+import java.io.IOException;
+
+public final class CodecFileAccumulable<T, C extends Codec<T>> implements Accumulable<T> {
+
+  private final File filename;
+  private final C codec;
+
+  public CodecFileAccumulable(final LocalStorageService s, final C codec) {
+    this.filename = s.getScratchSpace().newFile();
+    this.codec = codec;
+  }
+
+  public String getName() {
+    return this.filename.toString();
+  }
+
+  @Override
+  public Accumulator<T> accumulator() throws StorageException {
+    try {
+      return new CodecFileAccumulator<>(this.codec, this.filename);
+    } catch (final IOException e) {
+      throw new StorageException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileAccumulator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileAccumulator.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileAccumulator.java
new file mode 100644
index 0000000..3d6d7cb
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileAccumulator.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.local;
+
+import org.apache.reef.exception.evaluator.ServiceException;
+import org.apache.reef.exception.evaluator.StorageException;
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.serialization.Codec;
+
+import java.io.*;
+
+final class CodecFileAccumulator<T> implements Accumulator<T> {
+
+  private final Codec<T> codec;
+  private final ObjectOutputStream out;
+
+  public CodecFileAccumulator(final Codec<T> codec, final File file) throws IOException {
+    this.codec = codec;
+    this.out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file)));
+  }
+
+  @Override
+  public void add(final T datum) throws ServiceException {
+    final byte[] buf = codec.encode(datum);
+    try {
+      this.out.writeInt(buf.length);
+      this.out.write(buf);
+    } catch (final IOException e) {
+      throw new StorageException(e);
+    }
+  }
+
+  @Override
+  public void close() throws ServiceException {
+    try {
+      this.out.writeInt(-1);
+      this.out.close();
+    } catch (final IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileIterable.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileIterable.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileIterable.java
new file mode 100644
index 0000000..9545d19
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileIterable.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.local;
+
+import org.apache.reef.exception.evaluator.ServiceRuntimeException;
+import org.apache.reef.exception.evaluator.StorageException;
+import org.apache.reef.io.serialization.Codec;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * A read-only spool implementation, based on files. Some other process needs to
+ * create the spool file for us.
+ */
+public class CodecFileIterable<T, C extends Codec<T>> implements Iterable<T> {
+  private final File filename;
+  private final C codec;
+
+  public CodecFileIterable(final File filename, final C codec) {
+    this.filename = filename;
+    this.codec = codec;
+  }
+
+  @SuppressWarnings("resource")
+  @Override
+  public Iterator<T> iterator() {
+    try {
+      return new CodecFileIterator<>(this.codec, this.filename);
+    } catch (final IOException e) {
+      throw new ServiceRuntimeException(new StorageException(e));
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileIterator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileIterator.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileIterator.java
new file mode 100644
index 0000000..69538e0
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileIterator.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.local;
+
+import org.apache.reef.exception.evaluator.ServiceRuntimeException;
+import org.apache.reef.exception.evaluator.StorageException;
+import org.apache.reef.io.serialization.Codec;
+
+import java.io.*;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+final class CodecFileIterator<T> implements Iterator<T> {
+
+  private final Codec<T> codec;
+  private final ObjectInputStream in;
+  private int sz = 0;
+
+  CodecFileIterator(final Codec<T> codec, final File file) throws IOException {
+    this.in = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)));
+    this.codec = codec;
+    this.readNextSize();
+  }
+
+  private void readNextSize() throws IOException {
+    if (this.hasNext()) {
+      try {
+        this.sz = this.in.readInt();
+        if (this.sz == -1) {
+          this.in.close();
+        }
+      } catch (final IOException ex) {
+        this.sz = -1; // Don't read from that file again.
+        this.in.close();
+        throw ex;
+      }
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    return this.sz != -1;
+  }
+
+  @Override
+  public T next() {
+    if (!this.hasNext()) {
+      throw new NoSuchElementException("Moving past the end of the file.");
+    }
+    try {
+      final byte[] buf = new byte[this.sz];
+      for (int rem = buf.length; rem > 0; ) {
+        rem -= this.in.read(buf, buf.length - rem, rem);
+      }
+      this.readNextSize();
+      return this.codec.decode(buf);
+    } catch (final IOException e) {
+      throw new ServiceRuntimeException(new StorageException(e));
+    }
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException("Attempt to remove value from read-only input file!");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/LocalScratchSpace.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/LocalScratchSpace.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/LocalScratchSpace.java
new file mode 100644
index 0000000..34f0f5c
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/LocalScratchSpace.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.local;
+
+import org.apache.reef.io.storage.ScratchSpace;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+public class LocalScratchSpace implements ScratchSpace {
+
+  private final String jobName;
+  private final String evaluatorName;
+  private final Set<File> tempFiles = new ConcurrentSkipListSet<File>();
+  /**
+   * Zero denotes "unlimited"
+   */
+  private long quota;
+
+  public LocalScratchSpace(String jobName, String evaluatorName) {
+    this.jobName = jobName;
+    this.evaluatorName = evaluatorName;
+    this.quota = 0;
+  }
+
+  public LocalScratchSpace(String jobName, String evaluatorName, long quota) {
+    this.jobName = jobName;
+    this.evaluatorName = evaluatorName;
+    this.quota = quota;
+  }
+
+  public File newFile() {
+    final File ret;
+    try {
+      ret = File.createTempFile("reef-" + jobName + "-" + evaluatorName,
+          "tmp");
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    tempFiles.add(ret);
+    return ret;
+  }
+
+  @Override
+  public long availableSpace() {
+    return quota;
+  }
+
+  @Override
+  public long usedSpace() {
+    long ret = 0;
+    for (File f : tempFiles) {
+      // TODO: Error handling...
+      ret += f.length();
+    }
+    return ret;
+  }
+
+  @Override
+  public void delete() {
+    // TODO: Error handling. Files.delete() would give us an exception. We
+    // should pass a set of Exceptions into a ReefRuntimeException.
+    for (File f : tempFiles) {
+      f.delete();
+    }
+    tempFiles.clear();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/LocalStorageService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/LocalStorageService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/LocalStorageService.java
new file mode 100644
index 0000000..daae6cd
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/LocalStorageService.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.local;
+
+import org.apache.reef.io.storage.StorageService;
+
+
+public class LocalStorageService implements StorageService {
+  @SuppressWarnings("unused")
+  private final String jobName;
+  @SuppressWarnings("unused")
+  private final String evaluatorName;
+
+  private final LocalScratchSpace scratchSpace;
+
+  public LocalStorageService(String jobName, String evaluatorName) {
+    this.jobName = jobName;
+    this.evaluatorName = evaluatorName;
+    this.scratchSpace = new LocalScratchSpace(jobName, evaluatorName);
+  }
+
+  @Override
+  public LocalScratchSpace getScratchSpace() {
+    return scratchSpace;
+  }
+
+  ;
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/SerializerFileSpool.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/SerializerFileSpool.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/SerializerFileSpool.java
new file mode 100644
index 0000000..e9a4f41
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/SerializerFileSpool.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.local;
+
+import org.apache.reef.exception.evaluator.ServiceException;
+import org.apache.reef.exception.evaluator.ServiceRuntimeException;
+import org.apache.reef.io.Accumulable;
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.Spool;
+import org.apache.reef.io.serialization.Deserializer;
+import org.apache.reef.io.serialization.Serializer;
+
+import java.io.*;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+
+/**
+ * A SpoolFile backed by the filesystem.
+ *
+ * @param <T>
+ */
+public final class SerializerFileSpool<T> implements Spool<T> {
+
+  private final File file;
+  private final Accumulator<T> accumulator;
+  private final Deserializer<T, InputStream> deserializer;
+  private boolean canAppend = true;
+  private boolean canGetAccumulator = true;
+
+  public SerializerFileSpool(final LocalStorageService service,
+                             final Serializer<T, OutputStream> out, final Deserializer<T, InputStream> in)
+      throws ServiceException {
+    this.file = service.getScratchSpace().newFile();
+    Accumulable<T> accumulable;
+    try {
+      accumulable = out.create(new BufferedOutputStream(new FileOutputStream(
+          file)));
+    } catch (final FileNotFoundException e) {
+      throw new IllegalStateException(
+          "Unable to create temporary file:" + file, e);
+    }
+    this.deserializer = in;
+
+    final Accumulator<T> acc = accumulable.accumulator();
+    this.accumulator = new Accumulator<T>() {
+      @Override
+      public void add(final T datum) throws ServiceException {
+        if (!canAppend) {
+          throw new ConcurrentModificationException(
+              "Attempt to append after creating iterator!");
+        }
+        acc.add(datum);
+      }
+
+      @Override
+      public void close() throws ServiceException {
+        canAppend = false;
+        acc.close();
+      }
+    };
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+    try {
+      if (canAppend) {
+        throw new IllegalStateException(
+            "Need to call close() on accumulator before calling iterator()!");
+      }
+      return deserializer.create(
+          new BufferedInputStream(new FileInputStream(file))).iterator();
+    } catch (final IOException e) {
+      throw new ServiceRuntimeException(e);
+    }
+  }
+
+  @Override
+  public Accumulator<T> accumulator() {
+    if (!canGetAccumulator) {
+      throw new UnsupportedOperationException("Can only getAccumulator() once!");
+    }
+    canGetAccumulator = false;
+    return this.accumulator;
+  }
+}