You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:46:59 UTC
[26/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/package-info.java
new file mode 100644
index 0000000..f16facd
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/AvroUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/AvroUtils.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/AvroUtils.java
new file mode 100644
index 0000000..73a1cd4
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/AvroUtils.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.avro.io.*;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+/**
+ * Utilities for AVRO.
+ */
+final class AvroUtils {
+
+ private AvroUtils() {
+ }
+
+ /**
+ * Serializes the given avro object to a byte[]
+ *
+ * @param avroObject
+ * @param theClass
+ * @param <T>
+ * @return
+ */
+ static final <T> byte[] toBytes(T avroObject, Class<T> theClass) {
+ final DatumWriter<T> datumWriter = new SpecificDatumWriter<>(theClass);
+ final byte[] theBytes;
+ try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+ datumWriter.write(avroObject, encoder);
+ encoder.flush();
+ out.flush();
+ theBytes = out.toByteArray();
+ } catch (final IOException e) {
+ throw new RuntimeException("Unable to serialize an avro object", e);
+ }
+ return theBytes;
+ }
+
+ static <T> T fromBytes(final byte[] theBytes, final Class<T> theClass) {
+ final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(theBytes, null);
+ final SpecificDatumReader<T> reader = new SpecificDatumReader<>(theClass);
+ try {
+ return reader.read(null, decoder);
+ } catch (final IOException e) {
+ throw new RuntimeException("Failed to deserialize an avro object", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupRequest.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupRequest.java
new file mode 100644
index 0000000..eda5f15
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupRequest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.reef.wake.Identifier;
+
+/**
+ * Naming lookup request
+ */
+public class NamingLookupRequest extends NamingMessage {
+ private Iterable<Identifier> ids;
+
+ /**
+ * Constructs a naming lookup request
+ *
+ * @param ids the iterable of identifiers
+ */
+ public NamingLookupRequest(Iterable<Identifier> ids) {
+ this.ids = ids;
+ }
+
+ /**
+ * Gets identifiers
+ *
+ * @return an iterable of identifiers
+ */
+ public Iterable<Identifier> getIdentifiers() {
+ return ids;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupRequestCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupRequestCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupRequestCodec.java
new file mode 100644
index 0000000..b388ba6
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupRequestCodec.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.reef.io.network.naming.avro.AvroNamingLookupRequest;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Naming lookup request codec
+ */
+public final class NamingLookupRequestCodec implements Codec<NamingLookupRequest> {
+
+ private final IdentifierFactory factory;
+
+ /**
+ * Constructs a naming lookup request codec
+ *
+ * @param factory the identifier factory
+ */
+ @Inject
+ public NamingLookupRequestCodec(final IdentifierFactory factory) {
+ this.factory = factory;
+ }
+
+ /**
+ * Encodes the identifiers to bytes
+ *
+ * @param obj the naming lookup request
+ * @return a byte array
+ */
+ @Override
+ public byte[] encode(final NamingLookupRequest obj) {
+ final List<CharSequence> ids = new ArrayList<>();
+ for (final Identifier id : obj.getIdentifiers()) {
+ ids.add(id.toString());
+ }
+ return AvroUtils.toBytes(AvroNamingLookupRequest.newBuilder().setIds(ids).build(), AvroNamingLookupRequest.class);
+ }
+
+ /**
+ * Decodes the bytes to a naming lookup request
+ *
+ * @param buf the byte array
+ * @return a naming lookup request
+ */
+ @Override
+ public NamingLookupRequest decode(final byte[] buf) {
+ final AvroNamingLookupRequest req = AvroUtils.fromBytes(buf, AvroNamingLookupRequest.class);
+
+ final List<Identifier> ids = new ArrayList<Identifier>(req.getIds().size());
+ for (final CharSequence s : req.getIds()) {
+ ids.add(factory.getNewInstance(s.toString()));
+ }
+ return new NamingLookupRequest(ids);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupResponse.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupResponse.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupResponse.java
new file mode 100644
index 0000000..822f170
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupResponse.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.reef.io.naming.NameAssignment;
+
+import java.util.List;
+
+/**
+ * Naming lookup response
+ */
+public class NamingLookupResponse extends NamingMessage {
+ private final List<NameAssignment> nas;
+
+ /**
+ * Constructs a naming lookup response
+ *
+ * @param nas the list of name assignments
+ */
+ public NamingLookupResponse(List<NameAssignment> nas) {
+ this.nas = nas;
+ }
+
+ /**
+ * Gets name assignments
+ *
+ * @return a list of name assignments
+ */
+ public List<NameAssignment> getNameAssignments() {
+ return nas;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupResponseCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupResponseCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupResponseCodec.java
new file mode 100644
index 0000000..affe3b6
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingLookupResponseCodec.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.reef.io.naming.NameAssignment;
+import org.apache.reef.io.network.naming.NameAssignmentTuple;
+import org.apache.reef.io.network.naming.avro.AvroNamingAssignment;
+import org.apache.reef.io.network.naming.avro.AvroNamingLookupResponse;
+import org.apache.reef.io.network.naming.exception.NamingRuntimeException;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+
+import javax.inject.Inject;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Naming lookup response codec
+ */
+public final class NamingLookupResponseCodec implements Codec<NamingLookupResponse> {
+
+ private final IdentifierFactory factory;
+
+ /**
+ * Constructs a naming lookup response codec
+ *
+ * @param factory the identifier factory
+ */
+ @Inject
+ public NamingLookupResponseCodec(final IdentifierFactory factory) {
+ this.factory = factory;
+ }
+
+ /**
+ * Encodes name assignments to bytes
+ *
+ * @param obj the naming lookup response
+ * @return a byte array
+ */
+ @Override
+ public byte[] encode(NamingLookupResponse obj) {
+ final List<AvroNamingAssignment> assignments = new ArrayList<>(obj.getNameAssignments().size());
+ for (final NameAssignment nameAssignment : obj.getNameAssignments()) {
+ assignments.add(AvroNamingAssignment.newBuilder()
+ .setId(nameAssignment.getIdentifier().toString())
+ .setHost(nameAssignment.getAddress().getHostName())
+ .setPort(nameAssignment.getAddress().getPort())
+ .build());
+ }
+ return AvroUtils.toBytes(
+ AvroNamingLookupResponse.newBuilder().setTuples(assignments).build(), AvroNamingLookupResponse.class
+ );
+ }
+
+ /**
+ * Decodes bytes to an iterable of name assignments
+ *
+ * @param buf the byte array
+ * @return a naming lookup response
+ * @throws NamingRuntimeException
+ */
+ @Override
+ public NamingLookupResponse decode(final byte[] buf) {
+ final AvroNamingLookupResponse avroResponse = AvroUtils.fromBytes(buf, AvroNamingLookupResponse.class);
+ final List<NameAssignment> nas = new ArrayList<NameAssignment>(avroResponse.getTuples().size());
+ for (final AvroNamingAssignment tuple : avroResponse.getTuples()) {
+ nas.add(
+ new NameAssignmentTuple(
+ factory.getNewInstance(tuple.getId().toString()),
+ new InetSocketAddress(tuple.getHost().toString(), tuple.getPort())
+ )
+ );
+ }
+ return new NamingLookupResponse(nas);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingMessage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingMessage.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingMessage.java
new file mode 100644
index 0000000..3152854
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingMessage.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.reef.wake.remote.transport.Link;
+
+/**
+ * An abstract base class for naming messages
+ */
+public abstract class NamingMessage {
+ private transient Link<byte[]> link;
+
+ /**
+ * Gets a link
+ *
+ * @return a link
+ */
+ public Link<byte[]> getLink() {
+ return link;
+ }
+
+ /**
+ * Sets the link
+ *
+ * @param link the link
+ */
+ public void setLink(Link<byte[]> link) {
+ this.link = link;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequest.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequest.java
new file mode 100644
index 0000000..e6e1e9c
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.reef.io.naming.NameAssignment;
+
+/**
+ * naming registration request
+ */
+public class NamingRegisterRequest extends NamingMessage {
+ private final NameAssignment na;
+
+ /**
+ * Constructs a naming registration request
+ *
+ * @param na the name assignment
+ */
+ public NamingRegisterRequest(NameAssignment na) {
+ this.na = na;
+ }
+
+ /**
+ * Gets a name assignment
+ *
+ * @return a name assignment
+ */
+ public NameAssignment getNameAssignment() {
+ return na;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequestCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequestCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequestCodec.java
new file mode 100644
index 0000000..c32d888
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequestCodec.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.reef.io.network.naming.NameAssignmentTuple;
+import org.apache.reef.io.network.naming.avro.AvroNamingRegisterRequest;
+import org.apache.reef.io.network.naming.exception.NamingRuntimeException;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Naming registration request codec
+ */
+public class NamingRegisterRequestCodec implements Codec<NamingRegisterRequest> {
+
+ private final IdentifierFactory factory;
+
+ /**
+ * Constructs a naming registration request codec
+ *
+ * @param factory the identifier factory
+ */
+ public NamingRegisterRequestCodec(IdentifierFactory factory) {
+ this.factory = factory;
+ }
+
+ /**
+ * Encodes the name assignment to bytes
+ *
+ * @param obj the naming registration request
+ * @return a byte array
+ */
+ @Override
+ public byte[] encode(NamingRegisterRequest obj) {
+ final AvroNamingRegisterRequest result = AvroNamingRegisterRequest.newBuilder()
+ .setId(obj.getNameAssignment().getIdentifier().toString())
+ .setHost(obj.getNameAssignment().getAddress().getHostName())
+ .setPort(obj.getNameAssignment().getAddress().getPort())
+ .build();
+ return AvroUtils.toBytes(result, AvroNamingRegisterRequest.class);
+ }
+
+ /**
+ * Decodes the bytes to a name assignment
+ *
+ * @param buf the byte array
+ * @return a naming registration request
+ * @throws NamingRuntimeException
+ */
+ @Override
+ public NamingRegisterRequest decode(byte[] buf) {
+ final AvroNamingRegisterRequest avroNamingRegisterRequest = AvroUtils.fromBytes(buf, AvroNamingRegisterRequest.class);
+ return new NamingRegisterRequest(
+ new NameAssignmentTuple(factory.getNewInstance(avroNamingRegisterRequest.getId().toString()),
+ new InetSocketAddress(avroNamingRegisterRequest.getHost().toString(), avroNamingRegisterRequest.getPort()))
+ );
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterResponse.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterResponse.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterResponse.java
new file mode 100644
index 0000000..b8c416f
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterResponse.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+/**
+ * naming registration response
+ */
+public class NamingRegisterResponse extends NamingMessage {
+ private final NamingRegisterRequest request;
+
+ /**
+ * Constructs a naming register response
+ *
+ * @param request the naming register request
+ */
+ public NamingRegisterResponse(NamingRegisterRequest request) {
+ this.request = request;
+ }
+
+ /**
+ * Gets a naming register request
+ *
+ * @return a naming register request
+ */
+ public NamingRegisterRequest getRequest() {
+ return request;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterResponseCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterResponseCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterResponseCodec.java
new file mode 100644
index 0000000..f95a90e
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterResponseCodec.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.reef.wake.remote.Codec;
+
+/**
+ * naming registration response codec
+ */
+public class NamingRegisterResponseCodec implements Codec<NamingRegisterResponse> {
+ private final NamingRegisterRequestCodec codec;
+
+ /**
+ * Constructs a naming register response codec
+ *
+ * @param codec the naming register request codec
+ */
+ public NamingRegisterResponseCodec(NamingRegisterRequestCodec codec) {
+ this.codec = codec;
+ }
+
+ /**
+ * Encodes a naming register response to bytes
+ *
+ * @param obj the naming register response
+ * @return bytes a byte array
+ */
+ @Override
+ public byte[] encode(NamingRegisterResponse obj) {
+ return codec.encode(obj.getRequest());
+ }
+
+ /**
+ * Decodes a naming register response from the bytes
+ *
+ * @param buf the byte array
+ * @return a naming register response
+ */
+ @Override
+ public NamingRegisterResponse decode(byte[] buf) {
+ return new NamingRegisterResponse(codec.decode(buf));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingUnregisterRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingUnregisterRequest.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingUnregisterRequest.java
new file mode 100644
index 0000000..770b7c9
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingUnregisterRequest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.reef.wake.Identifier;
+
+/**
+ * Naming un-registration request
+ */
+public class NamingUnregisterRequest extends NamingMessage {
+ private final Identifier id;
+
+ /**
+ * Constructs a naming un-registration request
+ *
+ * @param id the identifier
+ */
+ public NamingUnregisterRequest(Identifier id) {
+ this.id = id;
+ }
+
+ /**
+ * Gets an identifier
+ *
+ * @return an identifier
+ */
+ public Identifier getIdentifier() {
+ return id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingUnregisterRequestCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingUnregisterRequestCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingUnregisterRequestCodec.java
new file mode 100644
index 0000000..ea505c3
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingUnregisterRequestCodec.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.serialization;
+
+import org.apache.reef.io.network.naming.avro.AvroNamingUnRegisterRequest;
+import org.apache.reef.io.network.naming.exception.NamingRuntimeException;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+
+import javax.inject.Inject;
+
+/**
+ * Naming un-registration request codec
+ */
+public final class NamingUnregisterRequestCodec implements Codec<NamingUnregisterRequest> {
+
+ private final IdentifierFactory factory;
+
+ /**
+ * Constructs a naming un-registration request codec
+ *
+ * @param factory the identifier factory
+ */
+ @Inject
+ public NamingUnregisterRequestCodec(final IdentifierFactory factory) {
+ this.factory = factory;
+ }
+
+ /**
+ * Encodes the naming un-registration request to bytes
+ *
+ * @param obj the naming un-registration request
+ * @return a byte array
+ */
+ @Override
+ public byte[] encode(NamingUnregisterRequest obj) {
+ final AvroNamingUnRegisterRequest result = AvroNamingUnRegisterRequest.newBuilder()
+ .setId(obj.getIdentifier().toString())
+ .build();
+ return AvroUtils.toBytes(result, AvroNamingUnRegisterRequest.class);
+ }
+
+ /**
+ * Decodes the bytes to a naming un-registration request
+ *
+ * @param buf the byte array
+ * @return a naming un-registration request
+ * @throws NamingRuntimeException
+ */
+ @Override
+ public NamingUnregisterRequest decode(byte[] buf) {
+ final AvroNamingUnRegisterRequest result = AvroUtils.fromBytes(buf, AvroNamingUnRegisterRequest.class);
+ return new NamingUnregisterRequest(factory.getNewInstance(result.getId().toString()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/package-info.java
new file mode 100644
index 0000000..d4ff9d3
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Contains naming serialization codecs
+ */
+package org.apache.reef.io.network.naming.serialization;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/package-info.java
new file mode 100644
index 0000000..4c53530
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/ListCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/ListCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/ListCodec.java
new file mode 100644
index 0000000..5e68871
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/ListCodec.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
+
+import org.apache.reef.wake.remote.Codec;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public final class ListCodec<T> implements Codec<List<T>> {
+
+ private static final Logger LOG = Logger.getLogger(ListCodec.class.getName());
+
+ private final Codec<T> codec;
+
+ public ListCodec(final Codec<T> codec) {
+ super();
+ this.codec = codec;
+ }
+
+ public static void main(final String[] args) {
+ final List<String> arrList = Arrays.asList(
+ new String[]{"One", "Two", "Three", "Four", "Five"});
+ LOG.log(Level.FINEST, "Input: {0}", arrList);
+ final ListCodec<String> lstCodec = new ListCodec<>(new StringCodec());
+ final byte[] bytes = lstCodec.encode(arrList);
+ LOG.log(Level.FINEST, "Output: {0}", lstCodec.decode(bytes));
+ }
+
+ @Override
+ public byte[] encode(final List<T> list) {
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DataOutputStream daos = new DataOutputStream(baos)) {
+ for (final T t : list) {
+ final byte[] tBytes = this.codec.encode(t);
+ daos.writeInt(tBytes.length);
+ daos.write(tBytes);
+ }
+ return baos.toByteArray();
+ } catch (final IOException ex) {
+ LOG.log(Level.WARNING, "Error in list encoding", ex);
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public List<T> decode(final byte[] list) {
+ final List<T> result = new ArrayList<>();
+ try (final DataInputStream dais = new DataInputStream(new ByteArrayInputStream(list))) {
+ while (dais.available() > 0) {
+ final int length = dais.readInt();
+ final byte[] tBytes = new byte[length];
+ dais.readFully(tBytes);
+ final T t = this.codec.decode(tBytes);
+ result.add(t);
+ }
+ return result;
+ } catch (final IOException ex) {
+ LOG.log(Level.WARNING, "Error in list decoding", ex);
+ throw new RuntimeException(ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/Pair.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/Pair.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/Pair.java
new file mode 100644
index 0000000..f1a3a07
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/Pair.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
+
+import java.io.Serializable;
+
+public final class Pair<T1, T2> implements Serializable {
+
+ public final T1 first;
+ public final T2 second;
+
+ private String pairStr = null;
+
+ public Pair(final T1 first, final T2 second) {
+ this.first = first;
+ this.second = second;
+ }
+
+ @Override
+ public String toString() {
+ if (this.pairStr == null) {
+ this.pairStr = "(" + this.first + "," + this.second + ")";
+ }
+ return this.pairStr;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringCodec.java
new file mode 100644
index 0000000..e4013bf
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringCodec.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
+
+import org.apache.reef.wake.remote.Codec;
+
+import javax.inject.Inject;
+
+
+public class StringCodec implements Codec<String> {
+
+ @Inject
+ public StringCodec() {
+ // Intentionally blank
+ }
+
+ @Override
+ public byte[] encode(String obj) {
+ return obj.getBytes();
+ }
+
+ @Override
+ public String decode(byte[] buf) {
+ return new String(buf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringIdentifier.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringIdentifier.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringIdentifier.java
new file mode 100644
index 0000000..5d6454b
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringIdentifier.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
+
+import org.apache.reef.wake.ComparableIdentifier;
+import org.apache.reef.wake.Identifier;
+
+/**
+ * String identifier
+ */
+public class StringIdentifier implements ComparableIdentifier {
+
+ private final String str;
+
+ /**
+ * Constructs a string identifier
+ *
+ * @param str a string
+ */
+ StringIdentifier(String str) {
+ this.str = str;
+ }
+
+ /**
+ * Returns a hash code for the object
+ *
+ * @return a hash code value for this object
+ */
+ public int hashCode() {
+ return str.hashCode();
+ }
+
+ /**
+ * Checks that another object is equal to this object
+ *
+ * @param o another object
+ * @return true if the object is the same as the object argument; false, otherwise
+ */
+ public boolean equals(Object o) {
+ return str.equals(((StringIdentifier) o).toString());
+ }
+
+ /**
+ * Returns a string representation of the object
+ *
+ * @return a string representation of the object
+ */
+ public String toString() {
+ return str;
+ }
+
+ @Override
+ public int compareTo(Identifier o) {
+ if (o == null) {
+ if (str == null)
+ return 0;
+ return 1;
+ } else {
+ if (str == null)
+ return -1;
+ return str.compareTo(o.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringIdentifierFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringIdentifierFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringIdentifierFactory.java
new file mode 100644
index 0000000..9886a01
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/StringIdentifierFactory.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
+
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+
+import javax.inject.Inject;
+
+/**
+ * Factory that creates StringIdentifier
+ */
+public class StringIdentifierFactory implements IdentifierFactory {
+
+ @Inject
+ public StringIdentifierFactory() {
+ }
+
+ /**
+ * Creates a StringIdentifier object
+ *
+ * @param s a string
+ * @return a string identifier
+ */
+ @Override
+ public Identifier getNewInstance(String s) {
+ return new StringIdentifier(s);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/package-info.java
new file mode 100644
index 0000000..4335f8f
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingInputStream.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingInputStream.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingInputStream.java
new file mode 100644
index 0000000..a40222b
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingInputStream.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage;
+
+import org.apache.reef.exception.evaluator.ServiceRuntimeException;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+
+public class FramingInputStream extends DataInputStream implements Iterable<byte[]> {
+
+ public FramingInputStream(InputStream in) {
+ super(in);
+ }
+
+ public byte[] readFrame() throws IOException {
+ int i = readInt();
+ if (i == -1) {
+ return null;
+ }
+ byte[] b = new byte[i];
+ readFully(b);
+ return b;
+ }
+
+ @Override
+ public Iterator<byte[]> iterator() {
+ try {
+ return new Iterator<byte[]>() {
+ byte[] cur = readFrame();
+
+ @Override
+ public boolean hasNext() {
+ return cur != null;
+ }
+
+ @Override
+ public byte[] next() {
+ byte[] ret = cur;
+ try {
+ cur = readFrame();
+ } catch (IOException e) {
+ throw new ServiceRuntimeException(e);
+ }
+ return ret;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ } catch (IOException e) {
+ throw new ServiceRuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingOutputStream.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingOutputStream.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingOutputStream.java
new file mode 100644
index 0000000..11547a6
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingOutputStream.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage;
+
+import org.apache.reef.exception.evaluator.ServiceException;
+import org.apache.reef.exception.evaluator.StorageException;
+import org.apache.reef.io.Accumulable;
+import org.apache.reef.io.Accumulator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class FramingOutputStream extends OutputStream implements Accumulable<byte[]> {
+
+ private final ByteArrayOutputStream baos;
+ private final DataOutputStream o;
+ private long offset;
+ private boolean closed;
+
+ public FramingOutputStream(OutputStream o) {
+ if (!(o instanceof DataOutputStream)) {
+ this.o = new DataOutputStream(o);
+ } else {
+ this.o = (DataOutputStream) o;
+ }
+ this.baos = new ByteArrayOutputStream();
+ }
+
+ public void nextFrame() throws StorageException {
+ try {
+ o.writeInt(baos.size());
+ offset += 4;
+ baos.writeTo(o);
+ baos.reset();
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
+ }
+
+ public long getCurrentOffset() {
+ return offset;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ baos.write(b);
+ offset++;
+ ;
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ baos.write(b);
+ offset += b.length;
+ }
+
+ @Override
+ public void write(byte[] b, int offset, int length) throws IOException {
+ baos.write(b, offset, length);
+ offset += length;
+ }
+
+ @Override
+ public void flush() {
+ // no-op.
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ try {
+ if (this.offset > 0) nextFrame();
+ } catch (StorageException e) {
+ throw (IOException) e.getCause();
+ }
+ o.writeInt(-1);
+ o.close();
+ closed = true;
+ }
+ }
+
+ @Override
+ public Accumulator<byte[]> accumulator() throws StorageException {
+ @SuppressWarnings("resource")
+ final FramingOutputStream fos = this;
+ return new Accumulator<byte[]>() {
+
+ @Override
+ public void add(byte[] datum) throws ServiceException {
+ try {
+ o.writeInt(datum.length);
+ offset += 4;
+ o.write(datum);
+ offset += datum.length;
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+
+ }
+
+ @Override
+ public void close() throws ServiceException {
+ try {
+ o.writeInt(-1);
+ offset += 4;
+ o.close();
+ fos.closed = true;
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingTupleDeserializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingTupleDeserializer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingTupleDeserializer.java
new file mode 100644
index 0000000..80affdf
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingTupleDeserializer.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage;
+
+import org.apache.reef.exception.evaluator.ServiceException;
+import org.apache.reef.exception.evaluator.ServiceRuntimeException;
+import org.apache.reef.io.Tuple;
+import org.apache.reef.io.serialization.Deserializer;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class FramingTupleDeserializer<K, V> implements
+ Deserializer<Tuple<K, V>, InputStream> {
+
+ private final Deserializer<K, InputStream> keyDeserializer;
+ private final Deserializer<V, InputStream> valDeserializer;
+
+ public FramingTupleDeserializer(Deserializer<K, InputStream> keyDeserializer,
+ Deserializer<V, InputStream> valDeserializer) {
+ this.keyDeserializer = keyDeserializer;
+ this.valDeserializer = valDeserializer;
+ }
+
+ @Override
+ public Iterable<Tuple<K, V>> create(InputStream ins) {
+ final DataInputStream in = new DataInputStream(ins);
+ final Iterable<K> keyItbl = keyDeserializer.create(in);
+ final Iterable<V> valItbl = valDeserializer.create(in);
+ return new Iterable<Tuple<K, V>>() {
+ @Override
+ public Iterator<Tuple<K, V>> iterator() {
+ final Iterator<K> keyIt = keyItbl.iterator();
+ final Iterator<V> valIt = valItbl.iterator();
+ try {
+ return new Iterator<Tuple<K, V>>() {
+
+ private int readFrameLength() throws ServiceException {
+ try {
+ return in.readInt();
+ } catch (IOException e) {
+ throw new ServiceRuntimeException(e);
+ }
+ }
+
+ int nextFrameLength = readFrameLength();
+
+ @Override
+ public boolean hasNext() {
+ return nextFrameLength != -1;
+ }
+
+ @Override
+ public Tuple<K, V> next() {
+ try {
+ if (nextFrameLength == -1) {
+ throw new NoSuchElementException();
+ }
+ K k = keyIt.next();
+ readFrameLength();
+ V v = valIt.next();
+ nextFrameLength = readFrameLength();
+ return new Tuple<>(k, v);
+ } catch (ServiceException e) {
+ throw new ServiceRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ } catch (ServiceException e) {
+ throw new ServiceRuntimeException(e);
+ }
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingTupleSerializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingTupleSerializer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingTupleSerializer.java
new file mode 100644
index 0000000..a98babd
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/FramingTupleSerializer.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage;
+
+import org.apache.reef.exception.evaluator.ServiceException;
+import org.apache.reef.exception.evaluator.StorageException;
+import org.apache.reef.io.Accumulable;
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.Tuple;
+import org.apache.reef.io.serialization.Serializer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class FramingTupleSerializer<K, V> implements
+ Serializer<Tuple<K, V>, OutputStream> {
+
+ private final Serializer<K, OutputStream> keySerializer;
+ private final Serializer<V, OutputStream> valSerializer;
+
+ public FramingTupleSerializer(
+ final Serializer<K, OutputStream> keySerializer,
+ final Serializer<V, OutputStream> valSerializer) {
+ this.keySerializer = keySerializer;
+ this.valSerializer = valSerializer;
+ }
+
+ @Override
+ public Accumulable<Tuple<K, V>> create(final OutputStream os) {
+ final FramingOutputStream faos = new FramingOutputStream(os);
+
+ return new Accumulable<Tuple<K, V>>() {
+
+ @Override
+ public Accumulator<Tuple<K, V>> accumulator() throws ServiceException {
+
+ final Accumulator<K> keyAccumulator = keySerializer.create(faos)
+ .accumulator();
+ final Accumulator<V> valAccumulator = valSerializer.create(faos)
+ .accumulator();
+ return new Accumulator<Tuple<K, V>>() {
+ boolean first = true;
+
+ @Override
+ public void add(Tuple<K, V> datum) throws ServiceException {
+ if (!first) {
+ faos.nextFrame();
+ }
+ first = false;
+ keyAccumulator.add(datum.getKey());
+ faos.nextFrame();
+ valAccumulator.add(datum.getValue());
+ }
+
+ @Override
+ public void close() throws ServiceException {
+ try {
+ keyAccumulator.close();
+ valAccumulator.close();
+ faos.close();
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
+ }
+ };
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/MergingIterator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/MergingIterator.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/MergingIterator.java
new file mode 100644
index 0000000..2dfcc18
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/MergingIterator.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage;
+
+import org.apache.reef.io.Tuple;
+import org.apache.reef.io.storage.util.TupleKeyComparator;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+public class MergingIterator<T> implements Iterator<T> {
+ private final PriorityQueue<Tuple<T, Iterator<T>>> heap;
+
+ public MergingIterator(final Comparator<T> c, Iterator<T>[] its) {
+ this.heap = new PriorityQueue<Tuple<T, Iterator<T>>>(11,
+ new TupleKeyComparator<T, Iterator<T>>(c));
+
+ for (Iterator<T> it : its) {
+ T b = it.hasNext() ? it.next() : null;
+ if (b != null) {
+ heap.add(new Tuple<>(b, it));
+ }
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return heap.size() != 0;
+ }
+
+ @Override
+ public T next() {
+ Tuple<T, Iterator<T>> ret = heap.remove();
+ if (ret.getValue().hasNext()) {
+ heap.add(new Tuple<>(ret.getValue().next(), ret.getValue()));
+ }
+ return ret.getKey();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException(
+ "Cannot remove entires from MergingIterator!");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ScratchSpace.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ScratchSpace.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ScratchSpace.java
new file mode 100644
index 0000000..0d7353a
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ScratchSpace.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage;
+
+public interface ScratchSpace {
+ long availableSpace();
+
+ long usedSpace();
+
+ void delete();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/StorageService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/StorageService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/StorageService.java
new file mode 100644
index 0000000..1002b76
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/StorageService.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage;
+
+public interface StorageService {
+
+ ScratchSpace getScratchSpace();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileAccumulable.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileAccumulable.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileAccumulable.java
new file mode 100644
index 0000000..c5413e6
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileAccumulable.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.local;
+
+import org.apache.reef.exception.evaluator.StorageException;
+import org.apache.reef.io.Accumulable;
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.serialization.Codec;
+
+import java.io.File;
+import java.io.IOException;
+
+public final class CodecFileAccumulable<T, C extends Codec<T>> implements Accumulable<T> {
+
+ private final File filename;
+ private final C codec;
+
+ public CodecFileAccumulable(final LocalStorageService s, final C codec) {
+ this.filename = s.getScratchSpace().newFile();
+ this.codec = codec;
+ }
+
+ public String getName() {
+ return this.filename.toString();
+ }
+
+ @Override
+ public Accumulator<T> accumulator() throws StorageException {
+ try {
+ return new CodecFileAccumulator<>(this.codec, this.filename);
+ } catch (final IOException e) {
+ throw new StorageException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileAccumulator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileAccumulator.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileAccumulator.java
new file mode 100644
index 0000000..3d6d7cb
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileAccumulator.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.local;
+
+import org.apache.reef.exception.evaluator.ServiceException;
+import org.apache.reef.exception.evaluator.StorageException;
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.serialization.Codec;
+
+import java.io.*;
+
+final class CodecFileAccumulator<T> implements Accumulator<T> {
+
+ private final Codec<T> codec;
+ private final ObjectOutputStream out;
+
+ public CodecFileAccumulator(final Codec<T> codec, final File file) throws IOException {
+ this.codec = codec;
+ this.out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file)));
+ }
+
+ @Override
+ public void add(final T datum) throws ServiceException {
+ final byte[] buf = codec.encode(datum);
+ try {
+ this.out.writeInt(buf.length);
+ this.out.write(buf);
+ } catch (final IOException e) {
+ throw new StorageException(e);
+ }
+ }
+
+ @Override
+ public void close() throws ServiceException {
+ try {
+ this.out.writeInt(-1);
+ this.out.close();
+ } catch (final IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileIterable.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileIterable.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileIterable.java
new file mode 100644
index 0000000..9545d19
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileIterable.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.local;
+
+import org.apache.reef.exception.evaluator.ServiceRuntimeException;
+import org.apache.reef.exception.evaluator.StorageException;
+import org.apache.reef.io.serialization.Codec;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * A read-only spool implementation, based on files. Some other process needs to
+ * create the spool file for us.
+ */
+public class CodecFileIterable<T, C extends Codec<T>> implements Iterable<T> {
+ private final File filename;
+ private final C codec;
+
+ public CodecFileIterable(final File filename, final C codec) {
+ this.filename = filename;
+ this.codec = codec;
+ }
+
+ @SuppressWarnings("resource")
+ @Override
+ public Iterator<T> iterator() {
+ try {
+ return new CodecFileIterator<>(this.codec, this.filename);
+ } catch (final IOException e) {
+ throw new ServiceRuntimeException(new StorageException(e));
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileIterator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileIterator.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileIterator.java
new file mode 100644
index 0000000..69538e0
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/CodecFileIterator.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.local;
+
+import org.apache.reef.exception.evaluator.ServiceRuntimeException;
+import org.apache.reef.exception.evaluator.StorageException;
+import org.apache.reef.io.serialization.Codec;
+
+import java.io.*;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+final class CodecFileIterator<T> implements Iterator<T> {
+
+ private final Codec<T> codec;
+ private final ObjectInputStream in;
+ private int sz = 0;
+
+ CodecFileIterator(final Codec<T> codec, final File file) throws IOException {
+ this.in = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)));
+ this.codec = codec;
+ this.readNextSize();
+ }
+
+ private void readNextSize() throws IOException {
+ if (this.hasNext()) {
+ try {
+ this.sz = this.in.readInt();
+ if (this.sz == -1) {
+ this.in.close();
+ }
+ } catch (final IOException ex) {
+ this.sz = -1; // Don't read from that file again.
+ this.in.close();
+ throw ex;
+ }
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return this.sz != -1;
+ }
+
+ @Override
+ public T next() {
+ if (!this.hasNext()) {
+ throw new NoSuchElementException("Moving past the end of the file.");
+ }
+ try {
+ final byte[] buf = new byte[this.sz];
+ for (int rem = buf.length; rem > 0; ) {
+ rem -= this.in.read(buf, buf.length - rem, rem);
+ }
+ this.readNextSize();
+ return this.codec.decode(buf);
+ } catch (final IOException e) {
+ throw new ServiceRuntimeException(new StorageException(e));
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Attempt to remove value from read-only input file!");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/LocalScratchSpace.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/LocalScratchSpace.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/LocalScratchSpace.java
new file mode 100644
index 0000000..34f0f5c
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/LocalScratchSpace.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.local;
+
+import org.apache.reef.io.storage.ScratchSpace;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+public class LocalScratchSpace implements ScratchSpace {
+
+ private final String jobName;
+ private final String evaluatorName;
+ private final Set<File> tempFiles = new ConcurrentSkipListSet<File>();
+ /**
+ * Zero denotes "unlimited"
+ */
+ private long quota;
+
+ public LocalScratchSpace(String jobName, String evaluatorName) {
+ this.jobName = jobName;
+ this.evaluatorName = evaluatorName;
+ this.quota = 0;
+ }
+
+ public LocalScratchSpace(String jobName, String evaluatorName, long quota) {
+ this.jobName = jobName;
+ this.evaluatorName = evaluatorName;
+ this.quota = quota;
+ }
+
+ public File newFile() {
+ final File ret;
+ try {
+ ret = File.createTempFile("reef-" + jobName + "-" + evaluatorName,
+ "tmp");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ tempFiles.add(ret);
+ return ret;
+ }
+
+ @Override
+ public long availableSpace() {
+ return quota;
+ }
+
+ @Override
+ public long usedSpace() {
+ long ret = 0;
+ for (File f : tempFiles) {
+ // TODO: Error handling...
+ ret += f.length();
+ }
+ return ret;
+ }
+
+ @Override
+ public void delete() {
+ // TODO: Error handling. Files.delete() would give us an exception. We
+ // should pass a set of Exceptions into a ReefRuntimeException.
+ for (File f : tempFiles) {
+ f.delete();
+ }
+ tempFiles.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/LocalStorageService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/LocalStorageService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/LocalStorageService.java
new file mode 100644
index 0000000..daae6cd
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/LocalStorageService.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.local;
+
+import org.apache.reef.io.storage.StorageService;
+
+
+public class LocalStorageService implements StorageService {
+ @SuppressWarnings("unused")
+ private final String jobName;
+ @SuppressWarnings("unused")
+ private final String evaluatorName;
+
+ private final LocalScratchSpace scratchSpace;
+
+ public LocalStorageService(String jobName, String evaluatorName) {
+ this.jobName = jobName;
+ this.evaluatorName = evaluatorName;
+ this.scratchSpace = new LocalScratchSpace(jobName, evaluatorName);
+ }
+
+ @Override
+ public LocalScratchSpace getScratchSpace() {
+ return scratchSpace;
+ }
+
+ ;
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/SerializerFileSpool.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/SerializerFileSpool.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/SerializerFileSpool.java
new file mode 100644
index 0000000..e9a4f41
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/local/SerializerFileSpool.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.local;
+
+import org.apache.reef.exception.evaluator.ServiceException;
+import org.apache.reef.exception.evaluator.ServiceRuntimeException;
+import org.apache.reef.io.Accumulable;
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.Spool;
+import org.apache.reef.io.serialization.Deserializer;
+import org.apache.reef.io.serialization.Serializer;
+
+import java.io.*;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+
+/**
+ * A SpoolFile backed by the filesystem.
+ *
+ * @param <T>
+ */
+public final class SerializerFileSpool<T> implements Spool<T> {
+
+ private final File file;
+ private final Accumulator<T> accumulator;
+ private final Deserializer<T, InputStream> deserializer;
+ private boolean canAppend = true;
+ private boolean canGetAccumulator = true;
+
+ public SerializerFileSpool(final LocalStorageService service,
+ final Serializer<T, OutputStream> out, final Deserializer<T, InputStream> in)
+ throws ServiceException {
+ this.file = service.getScratchSpace().newFile();
+ Accumulable<T> accumulable;
+ try {
+ accumulable = out.create(new BufferedOutputStream(new FileOutputStream(
+ file)));
+ } catch (final FileNotFoundException e) {
+ throw new IllegalStateException(
+ "Unable to create temporary file:" + file, e);
+ }
+ this.deserializer = in;
+
+ final Accumulator<T> acc = accumulable.accumulator();
+ this.accumulator = new Accumulator<T>() {
+ @Override
+ public void add(final T datum) throws ServiceException {
+ if (!canAppend) {
+ throw new ConcurrentModificationException(
+ "Attempt to append after creating iterator!");
+ }
+ acc.add(datum);
+ }
+
+ @Override
+ public void close() throws ServiceException {
+ canAppend = false;
+ acc.close();
+ }
+ };
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ try {
+ if (canAppend) {
+ throw new IllegalStateException(
+ "Need to call close() on accumulator before calling iterator()!");
+ }
+ return deserializer.create(
+ new BufferedInputStream(new FileInputStream(file))).iterator();
+ } catch (final IOException e) {
+ throw new ServiceRuntimeException(e);
+ }
+ }
+
+ @Override
+ public Accumulator<T> accumulator() {
+ if (!canGetAccumulator) {
+ throw new UnsupportedOperationException("Can only getAccumulator() once!");
+ }
+ canGetAccumulator = false;
+ return this.accumulator;
+ }
+}