You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:01 UTC
[28/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingDriverConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingDriverConfiguration.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingDriverConfiguration.java
new file mode 100644
index 0000000..6d5515e
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingDriverConfiguration.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.api;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.driver.parameters.EvaluatorAllocatedHandlers;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.wake.time.Clock;
+
+public final class DataLoadingDriverConfiguration extends ConfigurationModuleBuilder {
+ public static final ConfigurationModule CONF = new DataLoadingDriverConfiguration()
+ .merge(DriverConfiguration.CONF)
+ .bindSetEntry(Clock.StartHandler.class, DataLoader.StartHandler.class)
+ .bindSetEntry(EvaluatorAllocatedHandlers.class, DataLoader.EvaluatorAllocatedHandler.class)
+ .build();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
new file mode 100644
index 0000000..be479c6
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.api;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.io.data.loading.impl.EvaluatorRequestSerializer;
+import org.apache.reef.io.data.loading.impl.InputFormatExternalConstructor;
+import org.apache.reef.io.data.loading.impl.InputFormatLoadingService;
+import org.apache.reef.io.data.loading.impl.JobConfExternalConstructor;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.formats.ConfigurationModule;
+
+/**
+ * Builder to create a request to the DataLoadingService.
+ */
+public final class DataLoadingRequestBuilder
+ implements org.apache.reef.util.Builder<Configuration> {
+
+ private int memoryMB = -1;
+ private int numberOfCores = -1;
+ private int numberOfDesiredSplits = -1;
+ private EvaluatorRequest computeRequest = null;
+ private boolean inMemory = false;
+ private boolean renewFailedEvaluators = true;
+ private ConfigurationModule driverConfigurationModule = null;
+ private String inputFormatClass;
+ private String inputPath;
+
+ public DataLoadingRequestBuilder setNumberOfDesiredSplits(final int numberOfDesiredSplits) {
+ this.numberOfDesiredSplits = numberOfDesiredSplits;
+ return this;
+ }
+
+ /**
+ * Set the memory to be used for Evaluator allocated.
+ *
+ * @param memoryMB the amount of memory in MB
+ * @return this
+ */
+ public DataLoadingRequestBuilder setMemoryMB(final int memoryMB) {
+ this.memoryMB = memoryMB;
+ return this;
+ }
+
+ /**
+ * Set the core number to be used for Evaluator allocated.
+ *
+ * @param numberOfCores the number of cores
+ * @return this
+ */
+ public DataLoadingRequestBuilder setNumberOfCores(final int numberOfCores) {
+ this.numberOfCores = numberOfCores;
+ return this;
+ }
+
+ public DataLoadingRequestBuilder setComputeRequest(final EvaluatorRequest computeRequest) {
+ this.computeRequest = computeRequest;
+ return this;
+ }
+
+ public DataLoadingRequestBuilder loadIntoMemory(final boolean inMemory) {
+ this.inMemory = inMemory;
+ return this;
+ }
+
+ public DataLoadingRequestBuilder renewFailedEvaluators(final boolean renewFailedEvaluators) {
+ this.renewFailedEvaluators = renewFailedEvaluators;
+ return this;
+ }
+
+ public DataLoadingRequestBuilder setDriverConfigurationModule(
+ final ConfigurationModule driverConfigurationModule) {
+ this.driverConfigurationModule = driverConfigurationModule;
+ return this;
+ }
+
+ public DataLoadingRequestBuilder setInputFormatClass(
+ final Class<? extends InputFormat> inputFormatClass) {
+ this.inputFormatClass = inputFormatClass.getName();
+ return this;
+ }
+
+ public DataLoadingRequestBuilder setInputPath(final String inputPath) {
+ this.inputPath = inputPath;
+ return this;
+ }
+
+ @Override
+ public Configuration build() throws BindException {
+ if (this.driverConfigurationModule == null) {
+ throw new BindException("Driver Configuration Module is a required parameter.");
+ }
+
+ if (this.inputPath == null) {
+ throw new BindException("InputPath is a required parameter.");
+ }
+
+ if (this.inputFormatClass == null) {
+ this.inputFormatClass = TextInputFormat.class.getName();
+ }
+
+ final Configuration driverConfiguration;
+ if (renewFailedEvaluators) {
+ driverConfiguration = this.driverConfigurationModule
+ .set(DriverConfiguration.ON_DRIVER_STARTED, DataLoader.StartHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DataLoader.EvaluatorAllocatedHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_FAILED, DataLoader.EvaluatorFailedHandler.class)
+ .build();
+ } else {
+ driverConfiguration = this.driverConfigurationModule
+ .set(DriverConfiguration.ON_DRIVER_STARTED, DataLoader.StartHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DataLoader.EvaluatorAllocatedHandler.class)
+ .build();
+ }
+
+ final JavaConfigurationBuilder jcb =
+ Tang.Factory.getTang().newConfigurationBuilder(driverConfiguration);
+
+ if (this.numberOfDesiredSplits > 0) {
+ jcb.bindNamedParameter(NumberOfDesiredSplits.class, "" + this.numberOfDesiredSplits);
+ }
+
+ if (this.memoryMB > 0) {
+ jcb.bindNamedParameter(DataLoadingEvaluatorMemoryMB.class, "" + this.memoryMB);
+ }
+
+ if (this.numberOfCores > 0) {
+ jcb.bindNamedParameter(DataLoadingEvaluatorNumberOfCores.class, "" + this.numberOfCores);
+ }
+
+ if (this.computeRequest != null) {
+ jcb.bindNamedParameter(DataLoadingComputeRequest.class,
+ EvaluatorRequestSerializer.serialize(this.computeRequest));
+ }
+
+ return jcb
+ .bindNamedParameter(LoadDataIntoMemory.class, Boolean.toString(this.inMemory))
+ .bindConstructor(InputFormat.class, InputFormatExternalConstructor.class)
+ .bindConstructor(JobConf.class, JobConfExternalConstructor.class)
+ .bindNamedParameter(JobConfExternalConstructor.InputFormatClass.class, inputFormatClass)
+ .bindNamedParameter(JobConfExternalConstructor.InputPath.class, inputPath)
+ .bindImplementation(DataLoadingService.class, InputFormatLoadingService.class)
+ .build();
+ }
+
+ @NamedParameter(short_name = "num_splits", default_value = "0")
+ public static final class NumberOfDesiredSplits implements Name<Integer> {
+ }
+
+ @NamedParameter(short_name = "dataLoadingEvaluatorMemoryMB", default_value = "4096")
+ public static final class DataLoadingEvaluatorMemoryMB implements Name<Integer> {
+ }
+
+ @NamedParameter(short_name = "dataLoadingEvaluatorCore", default_value = "1")
+ public static final class DataLoadingEvaluatorNumberOfCores implements Name<Integer> {
+ }
+
+ @NamedParameter(default_value = "NULL")
+ public static final class DataLoadingComputeRequest implements Name<String> {
+ }
+
+ @NamedParameter(default_value = "false")
+ public static final class LoadDataIntoMemory implements Name<Boolean> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingService.java
new file mode 100644
index 0000000..9461148
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingService.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.api;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.tang.Configuration;
+
+/**
+ * All data loading services should implement this interface.
+ */
+@DriverSide
+public interface DataLoadingService {
+
+ /**
+ * Access to the number of partitions suggested by this DataSource.
+ *
+ * @return the number of partitions suggested by this DataSource.
+ */
+ int getNumberOfPartitions();
+
+ /**
+ * @return the context configuration for the given Evaluator.
+ */
+ Configuration getContextConfiguration(final AllocatedEvaluator allocatedEvaluator);
+
+ /**
+ * @return the service configuration for the given Evaluator.
+ */
+ Configuration getServiceConfiguration(final AllocatedEvaluator allocatedEvaluator);
+
+ /**
+ * @return Return the prefix to be used to enumerate
+ * context ids for compute requests fired other than
+ * the data load contexts.
+ */
+ String getComputeContextIdPrefix();
+
+ /**
+ * Distinguishes data loaded contexts from compute contexts.
+ *
+ * @return true if this context has been loaded with data.
+ */
+ boolean isDataLoadedContext(ActiveContext context);
+
+ /**
+ * @return true if this is a computation context,
+ * false otherwise. (e.g. this is a data loading context).
+ */
+ boolean isComputeContext(ActiveContext context);
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataSet.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataSet.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataSet.java
new file mode 100644
index 0000000..a15764d
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataSet.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.api;
+
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.io.network.util.Pair;
+
+/**
+ * A view of the data set to be loaded
+ * at an evaluator as an iterable of
+ * key value pairs.
+ * <p/>
+ * Implementations need not materialize
+ * and clients should not assume that the
+ * data is materialized. Any such thing
+ * is left as a post-processing step.
+ * <p/>
+ * Client also can't assume that the iterator
+ * returned here can be restarted
+ *
+ * @param <K>
+ * @param <V>
+ */
+@TaskSide
+public interface DataSet<K, V> extends Iterable<Pair<K, V>> {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/ResourceRequestHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/ResourceRequestHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/ResourceRequestHandler.java
new file mode 100644
index 0000000..ddf3b15
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/ResourceRequestHandler.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.api;
+
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ResourceRequestHandler implements EventHandler<EvaluatorRequest> {
+
+ private static final Logger LOG = Logger.getLogger(ResourceRequestHandler.class.getName());
+
+ private final EvaluatorRequestor requestor;
+
+ private CountDownLatch resourceRequestGate = new CountDownLatch(1);
+
+ public ResourceRequestHandler(final EvaluatorRequestor requestor) {
+ this.requestor = requestor;
+ }
+
+ public void releaseResourceRequestGate() {
+ LOG.log(Level.FINE, "Releasing Gate");
+ this.resourceRequestGate.countDown();
+ }
+
+ @Override
+ public void onNext(final EvaluatorRequest request) {
+ try {
+
+ LOG.log(Level.FINE,
+ "Processing a request with count: {0} - Waiting for gate to be released",
+ request.getNumber());
+
+ this.resourceRequestGate.await();
+
+ LOG.log(Level.FINE, "Gate released. Submitting request: {0}", request);
+ this.resourceRequestGate = new CountDownLatch(1);
+ this.requestor.submit(request);
+
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.FINEST, "Interrupted", ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorRequestSerializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorRequestSerializer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorRequestSerializer.java
new file mode 100644
index 0000000..2f493c6
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorRequestSerializer.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+
+import java.io.*;
+
+/**
+ * Serialize and deserialize EvaluatorRequest objects
+ * Currently only supports number & memory
+ * Does not take care of Resource Descriptor
+ */
+public class EvaluatorRequestSerializer {
+ public static String serialize(EvaluatorRequest request) {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ try (DataOutputStream daos = new DataOutputStream(baos)) {
+
+ daos.writeInt(request.getNumber());
+ daos.writeInt(request.getMegaBytes());
+ daos.writeInt(request.getNumberOfCores());
+
+ } catch (IOException e) {
+ throw e;
+ }
+
+ return Base64.encodeBase64String(baos.toByteArray());
+ } catch (IOException e1) {
+ throw new RuntimeException("Unable to serialize compute request", e1);
+ }
+ }
+
+ public static EvaluatorRequest deserialize(String serializedRequest) {
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serializedRequest))) {
+ try (DataInputStream dais = new DataInputStream(bais)) {
+ return EvaluatorRequest.newBuilder()
+ .setNumber(dais.readInt())
+ .setMemory(dais.readInt())
+ .setNumberOfCores(dais.readInt())
+ .build();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to de-serialize compute request", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorToPartitionMapper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorToPartitionMapper.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorToPartitionMapper.java
new file mode 100644
index 0000000..5a5c0ff
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorToPartitionMapper.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.io.data.loading.api.DataLoadingService;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Class that tracks the mapping between
+ * evaluators & the data partition assigned
+ * to those evaluators. Its part of the
+ * implementation of a {@link DataLoadingService}
+ * that uses the Hadoop {@link InputFormat} to
+ * partition the data and request resources
+ * accordingly
+ * <p/>
+ * This is an online version which satisfies
+ * requests in a greedy way.
+ *
+ * @param <V>
+ */
+@DriverSide
+public class EvaluatorToPartitionMapper<V extends InputSplit> {
+ private static final Logger LOG = Logger
+ .getLogger(EvaluatorToPartitionMapper.class.getName());
+
+ private final ConcurrentMap<String, BlockingQueue<NumberedSplit<V>>> locationToSplits = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, NumberedSplit<V>> evaluatorToSplits = new ConcurrentHashMap<>();
+ private final BlockingQueue<NumberedSplit<V>> unallocatedSplits = new LinkedBlockingQueue<>();
+
+ /**
+ * Initializes the locations of splits mapping
+ *
+ * @param splits
+ */
+ public EvaluatorToPartitionMapper(V[] splits) {
+ try {
+ for (int splitNum = 0; splitNum < splits.length; splitNum++) {
+ LOG.log(Level.FINE, "Processing split: " + splitNum);
+ final V split = splits[splitNum];
+ final String[] locations = split.getLocations();
+ final NumberedSplit<V> numberedSplit = new NumberedSplit<V>(split, splitNum);
+ unallocatedSplits.add(numberedSplit);
+ for (final String location : locations) {
+ BlockingQueue<NumberedSplit<V>> newSplitQue = new LinkedBlockingQueue<NumberedSplit<V>>();
+ final BlockingQueue<NumberedSplit<V>> splitQue = locationToSplits.putIfAbsent(location,
+ newSplitQue);
+ if (splitQue != null) {
+ newSplitQue = splitQue;
+ }
+ newSplitQue.add(numberedSplit);
+ }
+ }
+ for (Map.Entry<String, BlockingQueue<NumberedSplit<V>>> locSplit : locationToSplits.entrySet()) {
+ LOG.log(Level.FINE, locSplit.getKey() + ": " + locSplit.getValue().toString());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Unable to get InputSplits using the specified InputFormat", e);
+ }
+ }
+
+ /**
+ * Get an input split to be assigned to this
+ * evaluator
+ * <p/>
+ * Allocates one if its not already allocated
+ *
+ * @param evaluatorId
+ * @return
+ */
+ public NumberedSplit<V> getInputSplit(final String hostName, final String evaluatorId) {
+ synchronized (evaluatorToSplits) {
+ if (evaluatorToSplits.containsKey(evaluatorId)) {
+ LOG.log(Level.FINE, "Found an already allocated partition");
+ LOG.log(Level.FINE, evaluatorToSplits.toString());
+ return evaluatorToSplits.get(evaluatorId);
+ }
+ }
+ LOG.log(Level.FINE, "allocated partition not found");
+ if (locationToSplits.containsKey(hostName)) {
+ LOG.log(Level.FINE, "Found partitions possibly hosted for " + evaluatorId + " at " + hostName);
+ final NumberedSplit<V> split = allocateSplit(evaluatorId, locationToSplits.get(hostName));
+ LOG.log(Level.FINE, evaluatorToSplits.toString());
+ if (split != null) {
+ return split;
+ }
+ }
+ //pick random
+ LOG.log(
+ Level.FINE,
+ hostName
+ + " does not host any partitions or someone else took partitions hosted here. Picking a random one");
+ final NumberedSplit<V> split = allocateSplit(evaluatorId, unallocatedSplits);
+ LOG.log(Level.FINE, evaluatorToSplits.toString());
+ if (split != null) {
+ return split;
+ }
+ throw new RuntimeException("Unable to find an input partition to evaluator " + evaluatorId);
+ }
+
+ private NumberedSplit<V> allocateSplit(final String evaluatorId,
+ final BlockingQueue<NumberedSplit<V>> value) {
+ if (value == null) {
+ LOG.log(Level.FINE, "Queue of splits can't be empty. Returning null");
+ return null;
+ }
+ while (true) {
+ final NumberedSplit<V> split = value.poll();
+ if (split == null)
+ return null;
+ if (value == unallocatedSplits || unallocatedSplits.remove(split)) {
+ LOG.log(Level.FINE, "Found split-" + split.getIndex() + " in the queue");
+ final NumberedSplit<V> old = evaluatorToSplits.putIfAbsent(evaluatorId, split);
+ if (old != null) {
+ final String msg = "Trying to assign different partitions to the same evaluator " +
+ "is not supported";
+ LOG.severe(msg);
+ throw new RuntimeException(msg);
+ } else {
+ LOG.log(Level.FINE, "Returning " + split.getIndex());
+ return split;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InMemoryInputFormatDataSet.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InMemoryInputFormatDataSet.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InMemoryInputFormatDataSet.java
new file mode 100644
index 0000000..e729aaf
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InMemoryInputFormatDataSet.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.reef.io.data.loading.api.DataSet;
+import org.apache.reef.io.network.util.Pair;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class InMemoryInputFormatDataSet<K extends WritableComparable<K>, V extends Writable>
+ implements DataSet<K, V> {
+
+ private final InputFormatDataSet<K, V> inputFormatDataSet;
+ private List<Pair<K, V>> recordsList = null;
+
+ @Inject
+ public InMemoryInputFormatDataSet(InputFormatDataSet<K, V> inputFormatDataSet) {
+ this.inputFormatDataSet = inputFormatDataSet;
+ }
+
+
+ @Override
+ public synchronized Iterator<Pair<K, V>> iterator() {
+ if (recordsList == null) {
+ recordsList = new ArrayList<>();
+ for (final Pair<K, V> keyValue : inputFormatDataSet) {
+ recordsList.add(keyValue);
+ }
+ }
+ return recordsList.iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatDataSet.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatDataSet.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatDataSet.java
new file mode 100644
index 0000000..a7632f3
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatDataSet.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.*;
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.io.data.loading.api.DataSet;
+import org.apache.reef.io.network.util.Pair;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * An implementation of {@link DataSet} that reads records using a RecordReader
+ * encoded inside an InputSplit.
+ * <p/>
+ * The input split is injected through an external constructor by deserializing
+ * the input split assigned to this evaluator.
+ *
+ * @param <K>
+ * @param <V>
+ */
+@TaskSide
+public final class
+ InputFormatDataSet<K extends WritableComparable<K>, V extends Writable>
+ implements DataSet<K, V> {
+
+ private final DummyReporter dummyReporter = new DummyReporter();
+ private final JobConf jobConf;
+ private final InputFormat<K, V> inputFormat;
+ private final InputSplit split;
+ private RecordReader lastRecordReader = null;
+
+ @Inject
+ public InputFormatDataSet(final InputSplit split, final JobConf jobConf) {
+ this.jobConf = jobConf;
+ this.inputFormat = this.jobConf.getInputFormat();
+ this.split = split;
+ }
+
+ @Override
+ public Iterator<Pair<K, V>> iterator() {
+ try {
+
+ final RecordReader newRecordReader =
+ this.inputFormat.getRecordReader(this.split, this.jobConf, this.dummyReporter);
+
+ if (newRecordReader == this.lastRecordReader) {
+ throw new RuntimeException("Received the same record reader again. This isn't supported.");
+ }
+
+ this.lastRecordReader = newRecordReader;
+ return new RecordReaderIterator(newRecordReader);
+
+ } catch (final IOException ex) {
+ throw new RuntimeException("Can't instantiate iterator.", ex);
+ }
+ }
+
+ private final class RecordReaderIterator implements Iterator<Pair<K, V>> {
+
+ private final RecordReader<K, V> recordReader;
+ private Pair<K, V> recordPair;
+ private boolean hasNext;
+
+ RecordReaderIterator(final RecordReader<K, V> recordReader) {
+ this.recordReader = recordReader;
+ fetchRecord();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return this.hasNext;
+ }
+
+ @Override
+ public Pair<K, V> next() {
+ final Pair<K, V> prevRecordPair = this.recordPair;
+ fetchRecord();
+ return prevRecordPair;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Remove is not supported on RecordReader iterator");
+ }
+
+ private void fetchRecord() {
+ this.recordPair = new Pair<>(this.recordReader.createKey(), this.recordReader.createValue());
+ try {
+ this.hasNext = this.recordReader.next(this.recordPair.first, this.recordPair.second);
+ } catch (final IOException ex) {
+ throw new RuntimeException("Unable to get InputSplits using the specified InputFormat", ex);
+ }
+ }
+ }
+
+ private final class DummyReporter implements Reporter {
+
+ @Override
+ public void progress() {
+ }
+
+ @Override
+ public Counter getCounter(final Enum<?> key) {
+ return null;
+ }
+
+ @Override
+ public Counter getCounter(final String group, final String name) {
+ return null;
+ }
+
+ @Override
+ public InputSplit getInputSplit() throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("This is a Fake Reporter");
+ }
+
+ @Override
+ public float getProgress() {
+ return 0;
+ }
+
+ @Override
+ public void incrCounter(final Enum<?> key, final long amount) {
+ }
+
+ @Override
+ public void incrCounter(final String group, final String counter, final long amount) {
+ }
+
+ @Override
+ public void setStatus(final String status) {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatExternalConstructor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatExternalConstructor.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatExternalConstructor.java
new file mode 100644
index 0000000..aef6226
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatExternalConstructor.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.tang.ExternalConstructor;
+
+import javax.inject.Inject;
+
+
+/**
+ * A Tang External Constructor to inject the required
+ * InputFormat
+ */
+@DriverSide
+public class InputFormatExternalConstructor implements ExternalConstructor<InputFormat<?, ?>> {
+
+ private final JobConf jobConf;
+ private final InputFormat<?, ?> inputFormat;
+
+ @Inject
+ public InputFormatExternalConstructor(final JobConf jobConf) {
+ this.jobConf = jobConf;
+ inputFormat = jobConf.getInputFormat();
+ }
+
+ @Override
+ public InputFormat<?, ?> newInstance() {
+ return inputFormat;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatLoadingService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatLoadingService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatLoadingService.java
new file mode 100644
index 0000000..a2c9cb1
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatLoadingService.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.context.ServiceConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.io.data.loading.api.DataLoadingRequestBuilder;
+import org.apache.reef.io.data.loading.api.DataLoadingService;
+import org.apache.reef.io.data.loading.api.DataSet;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.BindException;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An implementation of {@link DataLoadingService}
+ * that uses the Hadoop {@link InputFormat} to find
+ * partitions of data & request resources.
+ * <p/>
+ * The InputFormat is injected using a Tang external constructor
+ * <p/>
+ * It also tries to obtain data locality in a greedy
+ * fashion using {@link EvaluatorToPartitionMapper}
+ */
+@DriverSide
+public class InputFormatLoadingService<K, V> implements DataLoadingService {
+
+ private static final Logger LOG = Logger.getLogger(InputFormatLoadingService.class.getName());
+
+ private static final String DATA_LOAD_CONTEXT_PREFIX = "DataLoadContext-";
+
+ private static final String COMPUTE_CONTEXT_PREFIX =
+ "ComputeContext-" + new Random(3381).nextInt(1 << 20) + "-";
+
+ private final EvaluatorToPartitionMapper<InputSplit> evaluatorToPartitionMapper;
+ private final int numberOfPartitions;
+
+ private final boolean inMemory;
+
+ private final String inputFormatClass;
+
+ private final String inputPath;
+
+ @Inject
+ public InputFormatLoadingService(
+ final InputFormat<K, V> inputFormat,
+ final JobConf jobConf,
+ final @Parameter(DataLoadingRequestBuilder.NumberOfDesiredSplits.class) int numberOfDesiredSplits,
+ final @Parameter(DataLoadingRequestBuilder.LoadDataIntoMemory.class) boolean inMemory,
+ final @Parameter(JobConfExternalConstructor.InputFormatClass.class) String inputFormatClass,
+ final @Parameter(JobConfExternalConstructor.InputPath.class) String inputPath) {
+
+ this.inMemory = inMemory;
+ this.inputFormatClass = inputFormatClass;
+ this.inputPath = inputPath;
+
+
+ try {
+
+ final InputSplit[] inputSplits = inputFormat.getSplits(jobConf, numberOfDesiredSplits);
+ if (LOG.isLoggable(Level.FINEST)) {
+ LOG.log(Level.FINEST, "Splits: {0}", Arrays.toString(inputSplits));
+ }
+
+ this.numberOfPartitions = inputSplits.length;
+ LOG.log(Level.FINE, "Number of partitions: {0}", this.numberOfPartitions);
+
+ this.evaluatorToPartitionMapper = new EvaluatorToPartitionMapper<>(inputSplits);
+
+ } catch (final IOException e) {
+ throw new RuntimeException("Unable to get InputSplits using the specified InputFormat", e);
+ }
+ }
+
+ @Override
+ public int getNumberOfPartitions() {
+ return this.numberOfPartitions;
+ }
+
+ @Override
+ public Configuration getContextConfiguration(final AllocatedEvaluator allocatedEvaluator) {
+
+ final NumberedSplit<InputSplit> numberedSplit =
+ this.evaluatorToPartitionMapper.getInputSplit(
+ allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor().getName(),
+ allocatedEvaluator.getId());
+
+ return ContextConfiguration.CONF
+ .set(ContextConfiguration.IDENTIFIER, DATA_LOAD_CONTEXT_PREFIX + numberedSplit.getIndex())
+ .build();
+ }
+
+ @Override
+ public Configuration getServiceConfiguration(final AllocatedEvaluator allocatedEvaluator) {
+
+ try {
+
+ final NumberedSplit<InputSplit> numberedSplit =
+ this.evaluatorToPartitionMapper.getInputSplit(
+ allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor().getName(),
+ allocatedEvaluator.getId());
+
+ final Configuration serviceConfiguration = ServiceConfiguration.CONF
+ .set(ServiceConfiguration.SERVICES,
+ this.inMemory ? InMemoryInputFormatDataSet.class : InputFormatDataSet.class)
+ .build();
+
+ return Tang.Factory.getTang().newConfigurationBuilder(serviceConfiguration)
+ .bindImplementation(
+ DataSet.class,
+ this.inMemory ? InMemoryInputFormatDataSet.class : InputFormatDataSet.class)
+ .bindNamedParameter(JobConfExternalConstructor.InputFormatClass.class, inputFormatClass)
+ .bindNamedParameter(JobConfExternalConstructor.InputPath.class, inputPath)
+ .bindNamedParameter(
+ InputSplitExternalConstructor.SerializedInputSplit.class,
+ WritableSerializer.serialize(numberedSplit.getEntry()))
+ .bindConstructor(InputSplit.class, InputSplitExternalConstructor.class)
+ .bindConstructor(JobConf.class, JobConfExternalConstructor.class)
+ .build();
+
+ } catch (final BindException ex) {
+ final String evalId = allocatedEvaluator.getId();
+ final String msg = "Unable to create configuration for evaluator " + evalId;
+ LOG.log(Level.WARNING, msg, ex);
+ throw new RuntimeException(msg, ex);
+ }
+ }
+
+ @Override
+ public String getComputeContextIdPrefix() {
+ return COMPUTE_CONTEXT_PREFIX;
+ }
+
+ @Override
+ public boolean isComputeContext(final ActiveContext context) {
+ return context.getId().startsWith(COMPUTE_CONTEXT_PREFIX);
+ }
+
+ @Override
+ public boolean isDataLoadedContext(final ActiveContext context) {
+ return context.getId().startsWith(DATA_LOAD_CONTEXT_PREFIX);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputSplitExternalConstructor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputSplitExternalConstructor.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputSplitExternalConstructor.java
new file mode 100644
index 0000000..cedad2d
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputSplitExternalConstructor.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.tang.ExternalConstructor;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+
+/**
+ * A Tang external constructor to inject an InputSplit
+ * by deserializing the serialized input split assigned
+ * to this evaluator
+ */
+@TaskSide
+public class InputSplitExternalConstructor implements ExternalConstructor<InputSplit> {
+
+ private final InputSplit inputSplit;
+
+ @Inject
+ public InputSplitExternalConstructor(
+ final JobConf jobConf,
+ @Parameter(SerializedInputSplit.class) final String serializedInputSplit) {
+ this.inputSplit = WritableSerializer.deserialize(serializedInputSplit, jobConf);
+ }
+
+ @Override
+ public InputSplit newInstance() {
+ return inputSplit;
+ }
+
+ @NamedParameter
+ public static final class SerializedInputSplit implements Name<String> {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/JobConfExternalConstructor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/JobConfExternalConstructor.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/JobConfExternalConstructor.java
new file mode 100644
index 0000000..67be9a7
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/JobConfExternalConstructor.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.reef.tang.ExternalConstructor;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class JobConfExternalConstructor implements ExternalConstructor<JobConf> {
+
+ private static final Logger LOG = Logger.getLogger(JobConfExternalConstructor.class.getName());
+
+ private final String inputFormatClassName;
+ private final String inputPath;
+
+ @Inject
+ public JobConfExternalConstructor(
+ final @Parameter(InputFormatClass.class) String inputFormatClassName,
+ final @Parameter(InputPath.class) String inputPath) {
+ this.inputFormatClassName = inputFormatClassName;
+ this.inputPath = inputPath;
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @Override
+ public JobConf newInstance() {
+
+ final JobConf jobConf = new JobConf();
+
+ try {
+
+ final Class<? extends InputFormat> inputFormatClass =
+ (Class<? extends InputFormat>) Class.forName(this.inputFormatClassName);
+
+ jobConf.setInputFormat(inputFormatClass);
+
+ final Method addInputPath =
+ inputFormatClass.getMethod("addInputPath", JobConf.class, Path.class);
+
+ addInputPath.invoke(inputFormatClass, jobConf, new Path(this.inputPath));
+
+ } catch (final ClassNotFoundException ex) {
+ throw new RuntimeException("InputFormat: " + this.inputFormatClassName
+ + " ClassNotFoundException while creating newInstance of JobConf", ex);
+ } catch (final InvocationTargetException | IllegalAccessException ex) {
+ throw new RuntimeException("InputFormat: " + this.inputFormatClassName
+ + ".addInputPath() method exists, but cannot be called.", ex);
+ } catch (final NoSuchMethodException ex) {
+ LOG.log(Level.INFO, "{0}.addInputPath() method does not exist", this.inputFormatClassName);
+ }
+
+ return jobConf;
+ }
+
+ @NamedParameter()
+ public static final class InputFormatClass implements Name<String> {
+ }
+
+ @NamedParameter(default_value = "NULL")
+ public static final class InputPath implements Name<String> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/NumberedSplit.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/NumberedSplit.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/NumberedSplit.java
new file mode 100644
index 0000000..6cef3b7
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/NumberedSplit.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+/**
+ * A tuple of an object of type E and an integer index
+ * Used inside {@link EvaluatorToPartitionMapper} to
+ * mark the partitions associated with each {@link InputSplit}
+ *
+ * @param <E>
+ */
+final class NumberedSplit<E> implements Comparable<NumberedSplit<E>> {
+ private final E entry;
+ private final int index;
+
+ public NumberedSplit(final E entry, final int index) {
+ super();
+ if (entry == null) {
+ throw new IllegalArgumentException("Entry cannot be null");
+ }
+ this.entry = entry;
+ this.index = index;
+ }
+
+ public E getEntry() {
+ return entry;
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+ @Override
+ public String toString() {
+ return "InputSplit-" + index;
+ }
+
+ @Override
+ public int compareTo(final NumberedSplit<E> o) {
+ if (this.index == o.index)
+ return 0;
+ if (this.index < o.index)
+ return -1;
+ else
+ return 1;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/WritableSerializer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/WritableSerializer.java
new file mode 100644
index 0000000..80264b1
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/WritableSerializer.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.reef.io.serialization.Codec;
+
+import java.io.*;
+
+/**
+ * A serializer class that serializes {@link Writable}s
+ * into String using the below {@link Codec} that
+ * encodes & decodes {@link Writable}s
+ * By default this stores the class name in the serialized
+ * form so that the specific type can be instantiated on
+ * de-serialization. However, this also needs the jobconf
+ * to passed in while de-serialization
+ */
+public class WritableSerializer {
+ public static <E extends Writable> String serialize(E writable) {
+ final WritableCodec<E> writableCodec = new WritableCodec<>();
+ return Base64.encodeBase64String(writableCodec.encode(writable));
+ }
+
+ public static <E extends Writable> E deserialize(String serializedWritable) {
+ final WritableCodec<E> writableCodec = new WritableCodec<>();
+ return writableCodec.decode(Base64.decodeBase64(serializedWritable));
+ }
+
+ public static <E extends Writable> E deserialize(String serializedWritable, JobConf jobConf) {
+ final WritableCodec<E> writableCodec = new WritableCodec<>(jobConf);
+ return writableCodec.decode(Base64.decodeBase64(serializedWritable));
+ }
+
+ static class WritableCodec<E extends Writable> implements Codec<E> {
+ private final JobConf jobConf;
+
+ public WritableCodec(JobConf jobConf) {
+ this.jobConf = jobConf;
+ }
+
+ public WritableCodec() {
+ this.jobConf = new JobConf();
+ }
+
+ @Override
+ public E decode(byte[] bytes) {
+ final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ try (DataInputStream dais = new DataInputStream(bais)) {
+ final String className = dais.readUTF();
+ E writable = (E) ReflectionUtils.newInstance(Class.forName(className), jobConf);
+ writable.readFields(dais);
+ return writable;
+ } catch (IOException e) {
+ throw new RuntimeException("Could not de-serialize JobConf", e);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Could not instantiate specific writable class", e);
+ }
+ }
+
+ @Override
+ public byte[] encode(E writable) {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (final DataOutputStream daos = new DataOutputStream(baos)) {
+ daos.writeUTF(writable.getClass().getName());
+ writable.write(daos);
+ return baos.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException("Could not serialize JobConf", e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Cache.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Cache.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Cache.java
new file mode 100644
index 0000000..718a576
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Cache.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Cache for network and naming services
+ */
+public interface Cache<K, V> {
+ /**
+ * Constructs with timeout
+ * key is evicted when it's not used for timeout milli-seconds
+ */
+
+ /**
+ * Returns a value for the key if cached; otherwise creates, caches and returns
+ * When it creates a value for a key, only one callable for the key is executed
+ *
+ * @param key a key
+ * @param callable a value fetcher
+ * @return a value
+ * @throws NetworkException
+ */
+ public V get(K key, Callable<V> valueFetcher) throws ExecutionException;
+
+ /**
+ * Invalidates a key from the cache
+ *
+ * @param key a key
+ */
+ public void invalidate(K key);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java
new file mode 100644
index 0000000..42d05f9
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+
+/**
+ * Connection between two end-points named by identifiers.
+ *
+ * @param <T> type
+ */
+public interface Connection<T> extends AutoCloseable {
+
+ /**
+ * Opens the connection.
+ *
+ * @throws NetworkException
+ */
+ void open() throws NetworkException;
+
+ /**
+ * Writes an object to the connection.
+ *
+ * @param obj
+ * @throws NetworkException
+ */
+ void write(T obj) throws NetworkException;
+
+ /**
+ * Closes the connection.
+ *
+ * @throws NetworkException
+ */
+ @Override
+ void close() throws NetworkException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/ConnectionFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/ConnectionFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/ConnectionFactory.java
new file mode 100644
index 0000000..40b28b0
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/ConnectionFactory.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network;
+
+import org.apache.reef.wake.Identifier;
+
+/**
+ * Factory that creates a new connection
+ *
+ * @param <T> type
+ */
+public interface ConnectionFactory<T> {
+
+ /**
+ * Creates a new connection
+ *
+ * @param destId a destination identifier
+ * @return a connection
+ */
+ public Connection<T> newConnection(Identifier destId);
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Message.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Message.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Message.java
new file mode 100644
index 0000000..1aaa7e0
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Message.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network;
+
+import org.apache.reef.wake.Identifier;
+
+/**
+ * Network message
+ *
+ * @param <T>
+ */
+public interface Message<T> {
+
+ /**
+ * Gets a source identifier
+ *
+ * @return an identifier
+ */
+ Identifier getSrcId();
+
+ /**
+ * Gets a destination identifier
+ *
+ * @return an identifier
+ */
+ Identifier getDestId();
+
+ /**
+ * Gets data
+ *
+ * @return an iterable of data objects
+ */
+ Iterable<T> getData();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/TransportFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/TransportFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/TransportFactory.java
new file mode 100644
index 0000000..1443a46
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/TransportFactory.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Transport;
+
+/**
+ * Factory that creates a transport
+ */
+public interface TransportFactory {
+
+ /**
+ * Creates a transport
+ *
+ * @param port a listening port
+ * @param clientHandler a transport client-side handler
+ * @param serverHandler a transport server-side handler
+ * @param exHandler an exception handler
+ * @return
+ */
+ public Transport create(int port,
+ EventHandler<TransportEvent> clientHandler,
+ EventHandler<TransportEvent> serverHandler,
+ EventHandler<Exception> exHandler);
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/NetworkRuntimeException.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/NetworkRuntimeException.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/NetworkRuntimeException.java
new file mode 100644
index 0000000..4424e8d
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/NetworkRuntimeException.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.exception;
+
+/**
+ * Network service resourcemanager exception
+ */
+public class NetworkRuntimeException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs a new network resourcemanager exception with the specified detail message and cause
+ *
+ * @param s the detailed message
+ * @param e the cause
+ */
+ public NetworkRuntimeException(String s, Throwable e) {
+ super(s, e);
+ }
+
+ /**
+ * Constructs a new network resourcemanager exception with the specified detail message
+ *
+ * @param s the detailed message
+ */
+ public NetworkRuntimeException(String s) {
+ super(s);
+ }
+
+ /**
+ * Constructs a new network resourcemanager exception with the specified cause
+ *
+ * @param e the cause
+ */
+ public NetworkRuntimeException(Throwable e) {
+ super(e);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/ParentDeadException.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/ParentDeadException.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/ParentDeadException.java
new file mode 100644
index 0000000..6e406d3
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/ParentDeadException.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.exception;
+
+/**
+ * This exception is thrown by a child task
+ * when it is told that its Parent died
+ */
+public class ParentDeadException extends Exception {
+
+ private static final long serialVersionUID = 7636542209271151579L;
+
+ public ParentDeadException() {
+ }
+
+ public ParentDeadException(final String message) {
+ super(message);
+ }
+
+ public ParentDeadException(final Throwable cause) {
+ super(cause);
+ }
+
+ public ParentDeadException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+ public ParentDeadException(final String message, final Throwable cause, final boolean enableSuppression, final boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/package-info.java
new file mode 100644
index 0000000..df0d392
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.exception;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNSToTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNSToTask.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNSToTask.java
new file mode 100644
index 0000000..e8c7842
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNSToTask.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.events.TaskStart;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.IdentifierFactory;
+
+import javax.inject.Inject;
+
+public class BindNSToTask implements EventHandler<TaskStart> {
+
+ private final NetworkService<?> ns;
+ private final IdentifierFactory idFac;
+
+ @Inject
+ public BindNSToTask(
+ final NetworkService<?> ns,
+ final @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) IdentifierFactory idFac) {
+ this.ns = ns;
+ this.idFac = idFac;
+ }
+
+ @Override
+ public void onNext(final TaskStart task) {
+ this.ns.registerId(this.idFac.getNewInstance(task.getId()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java
new file mode 100644
index 0000000..e169624
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.io.network.TransportFactory;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.SyncStage;
+import org.apache.reef.wake.remote.NetUtils;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+
+import javax.inject.Inject;
+
+/**
+ * Factory that creates a messaging transport
+ */
+public class MessagingTransportFactory implements TransportFactory {
+
+ @Inject
+ public MessagingTransportFactory() {
+ }
+
+ /**
+ * Creates a transport
+ *
+ * @param port a listening port
+ * @param clientHandler a transport client side handler
+ * @param serverHandler a transport server side handler
+ * @param exHandler a exception handler
+ */
+ @Override
+ public Transport create(final int port,
+ final EventHandler<TransportEvent> clientHandler,
+ final EventHandler<TransportEvent> serverHandler,
+ final EventHandler<Exception> exHandler) {
+
+ final Transport transport = new NettyMessagingTransport(NetUtils.getLocalAddress(),
+ port, new SyncStage<>(clientHandler), new SyncStage<>(serverHandler), 3, 10000);
+
+ transport.registerErrorHandler(exHandler);
+ return transport;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java
new file mode 100644
index 0000000..a4b4538
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.Connection;
+import org.apache.reef.io.network.naming.exception.NamingException;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.transport.Link;
+import org.apache.reef.wake.remote.transport.LinkListener;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A connection from the network service
+ */
+class NSConnection<T> implements Connection<T> {
+
+ private static final Logger LOG = Logger.getLogger(NSConnection.class.getName());
+
+ private final Identifier srcId;
+ private final Identifier destId;
+ private final LinkListener<NSMessage<T>> listener;
+ private final NetworkService<T> service;
+ private final Codec<NSMessage<T>> codec;
+
+ // link can change when an endpoint physical address changes
+ private Link<NSMessage<T>> link;
+
+ /**
+ * Constructs a connection
+ *
+ * @param srcId a source identifier
+ * @param destId a destination identifier
+ * @param listener a link listener
+ * @param service a network service
+ */
+ NSConnection(final Identifier srcId, final Identifier destId,
+ final LinkListener<T> listener, final NetworkService<T> service) {
+ this.srcId = srcId;
+ this.destId = destId;
+ this.listener = new NSMessageLinkListener<>(listener);
+ this.service = service;
+ this.codec = new NSMessageCodec<>(service.getCodec(), service.getIdentifierFactory());
+ }
+
+ /**
+ * Opens the connection.
+ */
+ @Override
+ public void open() throws NetworkException {
+
+ LOG.log(Level.FINE, "looking up {0}", this.destId);
+
+ try {
+ // naming lookup
+ final InetSocketAddress addr = this.service.getNameClient().lookup(this.destId);
+ if (addr == null) {
+ final NetworkException ex = new NamingException("Cannot resolve " + this.destId);
+ LOG.log(Level.WARNING, "Cannot resolve " + this.destId, ex);
+ throw ex;
+ }
+
+ LOG.log(Level.FINE, "Resolved {0} to {1}", new Object[]{this.destId, addr});
+
+ // connect to a remote address
+ this.link = this.service.getTransport().open(addr, this.codec, this.listener);
+ LOG.log(Level.FINE, "Transport returned a link {0}", this.link);
+
+ } catch (final Exception ex) {
+ LOG.log(Level.WARNING, "Could not open " + this.destId, ex);
+ throw new NetworkException(ex);
+ }
+ }
+
+ /**
+ * Writes an object to the connection
+ *
+ * @param obj an object of type T
+ * @throws a network exception
+ */
+ @Override
+ public void write(final T obj) throws NetworkException {
+ try {
+ this.link.write(new NSMessage<T>(this.srcId, this.destId, obj));
+ } catch (final IOException ex) {
+ LOG.log(Level.WARNING, "Could not write to " + this.destId, ex);
+ throw new NetworkException(ex);
+ }
+ }
+
+ /**
+ * Closes the connection and unregisters it from the service
+ */
+ @Override
+ public void close() throws NetworkException {
+ this.service.remove(this.destId);
+ }
+}
+
+/**
+ * No-op link listener
+ *
+ * @param <T>
+ */
+final class NSMessageLinkListener<T> implements LinkListener<NSMessage<T>> {
+
+ NSMessageLinkListener(final LinkListener<T> listener) {
+ }
+
+ @Override
+ public void messageReceived(final NSMessage<T> message) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSMessage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSMessage.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSMessage.java
new file mode 100644
index 0000000..063b9e3
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSMessage.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.io.network.Message;
+import org.apache.reef.wake.Identifier;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Network service message that implements the Message interface
+ *
+ * @param <T> type
+ */
+public class NSMessage<T> implements Message<T> {
+ private final Identifier srcId;
+ private final Identifier destId;
+ private final List<T> data;
+
+ /**
+ * Constructs a network service message
+ *
+ * @param srcId a source identifier
+ * @param destId a destination identifier
+ * @param data data of type T
+ */
+ public NSMessage(final Identifier srcId, final Identifier destId, final T data) {
+ this.srcId = srcId;
+ this.destId = destId;
+ this.data = new ArrayList<T>(1);
+ this.data.add(data);
+ }
+
+ /**
+ * Constructs a network service message
+ *
+ * @param srcId a source identifier
+ * @param destId a destination identifier
+ * @param data a list of data of type T
+ */
+ public NSMessage(final Identifier srcId, final Identifier destId, final List<T> data) {
+ this.srcId = srcId;
+ this.destId = destId;
+ this.data = data;
+ }
+
+ /**
+ * Gets a source identifier
+ *
+ * @return an identifier
+ */
+ public Identifier getSrcId() {
+ return srcId;
+ }
+
+ /**
+ * Gets a destination identifier
+ *
+ * @return an identifier
+ */
+ public Identifier getDestId() {
+ return destId;
+ }
+
+ /**
+ * Gets data
+ *
+ * @return data
+ */
+ public List<T> getData() {
+ return data;
+ }
+}