You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2014/10/23 02:03:45 UTC

[14/51] [abbrv] [partial] Initial merge of Wake, Tang and REEF into one repository and project

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/InputFormatLoadingService.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/InputFormatLoadingService.java b/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/InputFormatLoadingService.java
deleted file mode 100644
index aec2227..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/InputFormatLoadingService.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.data.loading.impl;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Random;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-import com.microsoft.reef.annotations.audience.DriverSide;
-import com.microsoft.reef.driver.context.ActiveContext;
-import com.microsoft.reef.driver.context.ContextConfiguration;
-import com.microsoft.reef.driver.context.ServiceConfiguration;
-import com.microsoft.reef.driver.evaluator.AllocatedEvaluator;
-import com.microsoft.reef.io.data.loading.api.DataLoadingRequestBuilder;
-import com.microsoft.reef.io.data.loading.api.DataLoadingService;
-import com.microsoft.reef.io.data.loading.api.DataSet;
-import com.microsoft.tang.Configuration;
-import com.microsoft.tang.Tang;
-import com.microsoft.tang.annotations.Parameter;
-import com.microsoft.tang.exceptions.BindException;
-
-/**
- * An implementation of {@link DataLoadingService}
- * that uses the Hadoop {@link InputFormat} to find
- * partitions of data & request resources.
- * <p/>
- * The InputFormat is injected using a Tang external constructor
- * <p/>
- * It also tries to obtain data locality in a greedy
- * fashion using {@link EvaluatorToPartitionMapper}
- */
-@DriverSide
-public class InputFormatLoadingService<K, V> implements DataLoadingService {
-
-  private static final Logger LOG = Logger.getLogger(InputFormatLoadingService.class.getName());
-
-  private static final String DATA_LOAD_CONTEXT_PREFIX = "DataLoadContext-";
-
-  private static final String COMPUTE_CONTEXT_PREFIX =
-      "ComputeContext-" + new Random(3381).nextInt(1 << 20) + "-";
-
-  private final EvaluatorToPartitionMapper<InputSplit> evaluatorToPartitionMapper;
-  private final int numberOfPartitions;
-
-  private final boolean inMemory;
-
-  private final String inputFormatClass;
-
-  private final String inputPath;
-
-  @Inject
-  public InputFormatLoadingService(
-      final InputFormat<K, V> inputFormat,
-      final JobConf jobConf,
-      final @Parameter(DataLoadingRequestBuilder.NumberOfDesiredSplits.class) int numberOfDesiredSplits,
-      final @Parameter(DataLoadingRequestBuilder.LoadDataIntoMemory.class) boolean inMemory,
-      final @Parameter(JobConfExternalConstructor.InputFormatClass.class) String inputFormatClass,
-      final @Parameter(JobConfExternalConstructor.InputPath.class) String inputPath) {
-
-    this.inMemory = inMemory;
-    this.inputFormatClass = inputFormatClass;
-    this.inputPath = inputPath;
-
-
-    try {
-
-      final InputSplit[] inputSplits = inputFormat.getSplits(jobConf, numberOfDesiredSplits);
-      if (LOG.isLoggable(Level.FINEST)) {
-        LOG.log(Level.FINEST, "Splits: {0}", Arrays.toString(inputSplits));
-      }
-
-      this.numberOfPartitions = inputSplits.length;
-      LOG.log(Level.FINE, "Number of partitions: {0}", this.numberOfPartitions);
-
-      this.evaluatorToPartitionMapper = new EvaluatorToPartitionMapper<>(inputSplits);
-
-    } catch (final IOException e) {
-      throw new RuntimeException("Unable to get InputSplits using the specified InputFormat", e);
-    }
-  }
-
-  @Override
-  public int getNumberOfPartitions() {
-    return this.numberOfPartitions;
-  }
-
-  @Override
-  public Configuration getContextConfiguration(final AllocatedEvaluator allocatedEvaluator) {
-
-    final NumberedSplit<InputSplit> numberedSplit =
-        this.evaluatorToPartitionMapper.getInputSplit(
-            allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor().getName(),
-            allocatedEvaluator.getId());
-
-    return ContextConfiguration.CONF
-        .set(ContextConfiguration.IDENTIFIER, DATA_LOAD_CONTEXT_PREFIX + numberedSplit.getIndex())
-        .build();
-  }
-
-  @Override
-  public Configuration getServiceConfiguration(final AllocatedEvaluator allocatedEvaluator) {
-
-    try {
-
-      final NumberedSplit<InputSplit> numberedSplit =
-          this.evaluatorToPartitionMapper.getInputSplit(
-              allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor().getName(),
-              allocatedEvaluator.getId());
-
-      final Configuration serviceConfiguration = ServiceConfiguration.CONF
-          .set(ServiceConfiguration.SERVICES,
-              this.inMemory ? InMemoryInputFormatDataSet.class : InputFormatDataSet.class)
-          .build();
-
-      return Tang.Factory.getTang().newConfigurationBuilder(serviceConfiguration)
-          .bindImplementation(
-              DataSet.class,
-              this.inMemory ? InMemoryInputFormatDataSet.class : InputFormatDataSet.class)
-          .bindNamedParameter(JobConfExternalConstructor.InputFormatClass.class, inputFormatClass)
-          .bindNamedParameter(JobConfExternalConstructor.InputPath.class, inputPath)
-          .bindNamedParameter(
-              InputSplitExternalConstructor.SerializedInputSplit.class,
-              WritableSerializer.serialize(numberedSplit.getEntry()))
-          .bindConstructor(InputSplit.class, InputSplitExternalConstructor.class)
-          .bindConstructor(JobConf.class, JobConfExternalConstructor.class)
-          .build();
-
-    } catch (final BindException ex) {
-      final String evalId = allocatedEvaluator.getId();
-      final String msg = "Unable to create configuration for evaluator " + evalId;
-      LOG.log(Level.WARNING, msg, ex);
-      throw new RuntimeException(msg, ex);
-    }
-  }
-
-  @Override
-  public String getComputeContextIdPrefix() {
-    return COMPUTE_CONTEXT_PREFIX;
-  }
-
-  @Override
-  public boolean isComputeContext(final ActiveContext context) {
-    return context.getId().startsWith(COMPUTE_CONTEXT_PREFIX);
-  }
-
-  @Override
-  public boolean isDataLoadedContext(final ActiveContext context) {
-    return context.getId().startsWith(DATA_LOAD_CONTEXT_PREFIX);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/InputSplitExternalConstructor.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/InputSplitExternalConstructor.java b/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/InputSplitExternalConstructor.java
deleted file mode 100644
index efa3c33..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/InputSplitExternalConstructor.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.data.loading.impl;
-
-import javax.inject.Inject;
-
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-import com.microsoft.reef.annotations.audience.TaskSide;
-import com.microsoft.tang.ExternalConstructor;
-import com.microsoft.tang.annotations.Name;
-import com.microsoft.tang.annotations.NamedParameter;
-import com.microsoft.tang.annotations.Parameter;
-
-/**
- * A Tang external constructor to inject an InputSplit
- * by deserializing the serialized input split assigned
- * to this evaluator
- */
-@TaskSide
-public class InputSplitExternalConstructor implements ExternalConstructor<InputSplit> {
-
-  @NamedParameter
-  public static final class SerializedInputSplit implements Name<String> { }
-
-  private final InputSplit inputSplit;
-
-  @Inject
-  public InputSplitExternalConstructor(
-      final JobConf jobConf,
-      @Parameter(SerializedInputSplit.class) final String serializedInputSplit){
-    this.inputSplit = WritableSerializer.deserialize(serializedInputSplit, jobConf);
-  }
-
-  @Override
-  public InputSplit newInstance() {
-    return inputSplit;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/JobConfExternalConstructor.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/JobConfExternalConstructor.java b/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/JobConfExternalConstructor.java
deleted file mode 100644
index dab18f5..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/JobConfExternalConstructor.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.data.loading.impl;
-
-import com.microsoft.tang.ExternalConstructor;
-import com.microsoft.tang.annotations.Name;
-import com.microsoft.tang.annotations.NamedParameter;
-import com.microsoft.tang.annotations.Parameter;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-import javax.inject.Inject;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public class JobConfExternalConstructor implements ExternalConstructor<JobConf> {
-
-  private static final Logger LOG = Logger.getLogger(JobConfExternalConstructor.class.getName());
-
-  private final String inputFormatClassName;
-  private final String inputPath;
-
-  @NamedParameter()
-  public static final class InputFormatClass implements Name<String> {
-  }
-
-  @NamedParameter(default_value = "NULL")
-  public static final class InputPath implements Name<String> {
-  }
-
-  @Inject
-  public JobConfExternalConstructor(
-      final @Parameter(InputFormatClass.class) String inputFormatClassName,
-      final @Parameter(InputPath.class) String inputPath) {
-    this.inputFormatClassName = inputFormatClassName;
-    this.inputPath = inputPath;
-  }
-
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  @Override
-  public JobConf newInstance() {
-
-    final JobConf jobConf = new JobConf();
-
-    try {
-
-      final Class<? extends InputFormat> inputFormatClass =
-          (Class<? extends InputFormat>) Class.forName(this.inputFormatClassName);
-
-      jobConf.setInputFormat(inputFormatClass);
-
-      final Method addInputPath =
-          inputFormatClass.getMethod("addInputPath", JobConf.class, Path.class);
-
-      addInputPath.invoke(inputFormatClass, jobConf, new Path(this.inputPath));
-
-    } catch (final ClassNotFoundException ex) {
-      throw new RuntimeException("InputFormat: " + this.inputFormatClassName
-          + " ClassNotFoundException while creating newInstance of JobConf", ex);
-    } catch (final InvocationTargetException | IllegalAccessException ex) {
-      throw new RuntimeException("InputFormat: " + this.inputFormatClassName
-          + ".addInputPath() method exists, but cannot be called.", ex);
-    } catch (final NoSuchMethodException ex) {
-      LOG.log(Level.INFO, "{0}.addInputPath() method does not exist", this.inputFormatClassName);
-    }
-
-    return jobConf;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/NumberedSplit.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/NumberedSplit.java b/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/NumberedSplit.java
deleted file mode 100644
index 708729b..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/NumberedSplit.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.data.loading.impl;
-
-import org.apache.hadoop.mapred.InputSplit;
-
-/**
- * A tuple of an object of type E and an integer index
- * Used inside {@link EvaluatorToPartitionMapper} to
- * mark the partitions associated with each {@link InputSplit}
- * 
- * @param <E>
- */
-final class NumberedSplit<E> implements Comparable<NumberedSplit<E>> {
-  private final E entry;
-  private final int index;
-
-  public NumberedSplit(final E entry, final int index) {
-    super();
-    if (entry == null) {
-      throw new IllegalArgumentException("Entry cannot be null");
-    }
-    this.entry = entry;
-    this.index = index;
-  }
-
-  public E getEntry() {
-    return entry;
-  }
-
-  public int getIndex() {
-    return index;
-  }
-
-  @Override
-  public String toString() {
-    return "InputSplit-" + index;
-  }
-
-  @Override
-  public int compareTo(final NumberedSplit<E> o) {
-    if (this.index == o.index)
-      return 0;
-    if (this.index < o.index)
-      return -1;
-    else
-      return 1;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/WritableSerializer.java b/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/WritableSerializer.java
deleted file mode 100644
index c3515de..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/WritableSerializer.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.data.loading.impl;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.microsoft.reef.io.serialization.Codec;
-
-/**
- * A serializer class that serializes {@link Writable}s
- * into String using the below {@link Codec} that
- * encodes & decodes {@link Writable}s
- * By default this stores the class name in the serialized
- * form so that the specific type can be instantiated on
- * de-serialization. However, this also needs the jobconf
- * to passed in while de-serialization
- */
-public class WritableSerializer {
-  public static <E extends Writable> String serialize(E writable){
-    final WritableCodec<E> writableCodec = new WritableCodec<>();
-    return Base64.encodeBase64String(writableCodec.encode(writable));
-  }
-  
-  public static <E extends Writable> E deserialize(String serializedWritable){
-    final WritableCodec<E> writableCodec = new WritableCodec<>();
-    return writableCodec.decode(Base64.decodeBase64(serializedWritable));
-  }
-  
-  public  static <E extends Writable> E deserialize(String serializedWritable, JobConf jobConf){
-    final WritableCodec<E> writableCodec = new WritableCodec<>(jobConf);
-    return writableCodec.decode(Base64.decodeBase64(serializedWritable));
-  }
-  
-  static class WritableCodec<E extends Writable> implements Codec<E>{
-    private final JobConf jobConf;
-    
-    public WritableCodec(JobConf jobConf) {
-      this.jobConf = jobConf;
-    }
-
-    public WritableCodec() {
-      this.jobConf = new JobConf();
-    }
-
-    @Override
-    public E decode(byte[] bytes) {
-      final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-      try(DataInputStream dais = new DataInputStream(bais)){
-        final String className = dais.readUTF();
-        E writable = (E) ReflectionUtils.newInstance(Class.forName(className), jobConf);
-        writable.readFields(dais);
-        return writable;
-      } catch (IOException e) {
-        throw new RuntimeException("Could not de-serialize JobConf", e);
-      } catch (ClassNotFoundException e) {
-        throw new RuntimeException("Could not instantiate specific writable class", e);
-      }
-    }
-
-    @Override
-    public byte[] encode(E writable) {
-      final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      try(final DataOutputStream daos = new DataOutputStream(baos)) {
-        daos.writeUTF(writable.getClass().getName());
-        writable.write(daos);
-        return baos.toByteArray();
-      } catch (IOException e) {
-        throw new RuntimeException("Could not serialize JobConf", e);
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/Cache.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/Cache.java b/reef-io/src/main/java/com/microsoft/reef/io/network/Cache.java
deleted file mode 100644
index d395eda..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/Cache.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network;
-
-import com.microsoft.reef.exception.evaluator.NetworkException;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-
-/**
- * Cache for network and naming services
- */
-public interface Cache<K,V> {
-  /**
-   *  Constructs with timeout
-   *  key is evicted when it's not used for timeout milli-seconds
-   */
-  
-  /**
-   * Returns a value for the key if cached; otherwise creates, caches and returns
-   * When it creates a value for a key, only one callable for the key is executed
-   * 
-   * @param key a key
-   * @param callable a value fetcher
-   * @return a value
-   * @throws NetworkException
-   */
-  public V get(K key, Callable<V> valueFetcher) throws ExecutionException;
- 
-  /**
-   * Invalidates a key from the cache
-   * @param key a key
-   */
-  public void invalidate(K key);
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/Connection.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/Connection.java b/reef-io/src/main/java/com/microsoft/reef/io/network/Connection.java
deleted file mode 100644
index bf51233..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/Connection.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network;
-
-import com.microsoft.reef.exception.evaluator.NetworkException;
-
-/**
- * Connection between two end-points named by identifiers.
- * 
- * @param <T> type
- */
-public interface Connection<T> extends AutoCloseable {
-
-  /**
-   * Opens the connection.
-   * 
-   * @throws NetworkException
-   */
-  void open() throws NetworkException;
-
-  /**
-   * Writes an object to the connection.
-   * 
-   * @param obj
-   * @throws NetworkException
-   */
-  void write(T obj) throws NetworkException;
-
-  /**
-   * Closes the connection.
-   * 
-   * @throws NetworkException
-   */
-  @Override
-  void close() throws NetworkException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/ConnectionFactory.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/ConnectionFactory.java b/reef-io/src/main/java/com/microsoft/reef/io/network/ConnectionFactory.java
deleted file mode 100644
index 79cd9aa..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/ConnectionFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network;
-
-import com.microsoft.wake.Identifier;
-
-/**
- * Factory that creates a new connection 
- *
- * @param <T> type
- */
-public interface ConnectionFactory<T> {
-  
-  /**
-   * Creates a new connection
-   * 
-   * @param destId a destination identifier
-   * @return a connection
-   */
-  public Connection<T> newConnection(Identifier destId);
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/Message.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/Message.java b/reef-io/src/main/java/com/microsoft/reef/io/network/Message.java
deleted file mode 100644
index f35c8d7..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/Message.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network;
-
-import com.microsoft.wake.Identifier;
-
-/**
- * Network message
- * 
- * @param <T>
- */
-public interface Message<T> {
-  
-  /**
-   * Gets a source identifier
-   * 
-   * @return an identifier
-   */
-  Identifier getSrcId();
-
-  /**
-   * Gets a destination identifier
-   * 
-   * @return an identifier
-   */
-  Identifier getDestId();
-
-  /**
-   * Gets data
-   * 
-   * @return an iterable of data objects
-   */
-  Iterable<T> getData();
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/TransportFactory.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/TransportFactory.java b/reef-io/src/main/java/com/microsoft/reef/io/network/TransportFactory.java
deleted file mode 100644
index ce1731c..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/TransportFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network;
-
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.remote.impl.TransportEvent;
-import com.microsoft.wake.remote.transport.Transport;
-
-/**
- * Factory that creates a transport
- */
-public interface TransportFactory {
-
-  /**
-   * Creates a transport
-   *
-   * @param port          a listening port
-   * @param clientHandler a transport client-side handler
-   * @param serverHandler a transport server-side handler
-   * @param exHandler     an exception handler
-   * @return
-   */
-  public Transport create(int port,
-                          EventHandler<TransportEvent> clientHandler,
-                          EventHandler<TransportEvent> serverHandler,
-                          EventHandler<Exception> exHandler);
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/exception/NetworkRuntimeException.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/exception/NetworkRuntimeException.java b/reef-io/src/main/java/com/microsoft/reef/io/network/exception/NetworkRuntimeException.java
deleted file mode 100644
index 3df006f..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/exception/NetworkRuntimeException.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.exception;
-
-/**
- * Network service resourcemanager exception
- */
-public class NetworkRuntimeException extends RuntimeException {
-  private static final long serialVersionUID = 1L;
-
-  /**
-   * Constructs a new network resourcemanager exception with the specified detail message and cause
-   *
-   * @param s the detailed message
-   * @param e the cause
-   */
-  public NetworkRuntimeException(String s, Throwable e) {
-    super(s, e);
-  }
-
-  /**
-   * Constructs a new network resourcemanager exception with the specified detail message
-   *
-   * @param s the detailed message
-   */
-  public NetworkRuntimeException(String s) {
-    super(s);
-  }
-
-  /**
-   * Constructs a new network resourcemanager exception with the specified cause
-   *
-   * @param e the cause
-   */
-  public NetworkRuntimeException(Throwable e) {
-    super(e);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/exception/ParentDeadException.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/exception/ParentDeadException.java b/reef-io/src/main/java/com/microsoft/reef/io/network/exception/ParentDeadException.java
deleted file mode 100644
index 7374497..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/exception/ParentDeadException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2013 Microsoft.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.exception;
-
-/**
- * This exception is thrown by a child task
- * when it is told that its Parent died
- */
-public class ParentDeadException extends Exception {
-
-  private static final long serialVersionUID = 7636542209271151579L;
-
-  public ParentDeadException() {
-  }
-
-  public ParentDeadException(final String message) {
-    super(message);
-  }
-
-  public ParentDeadException(final Throwable cause) {
-    super(cause);
-  }
-
-  public ParentDeadException(final String message, final Throwable cause) {
-    super(message, cause);
-  }
-
-  public ParentDeadException(final String message, final Throwable cause, final boolean enableSuppression, final boolean writableStackTrace) {
-    super(message, cause, enableSuppression, writableStackTrace);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/exception/package-info.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/exception/package-info.java b/reef-io/src/main/java/com/microsoft/reef/io/network/exception/package-info.java
deleted file mode 100644
index 43a646d..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/exception/package-info.java
+++ /dev/null
@@ -1,16 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.exception;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/BindNSToTask.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/BindNSToTask.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/BindNSToTask.java
deleted file mode 100644
index 25c8b61..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/BindNSToTask.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import javax.inject.Inject;
-
-import com.microsoft.reef.task.events.TaskStart;
-import com.microsoft.tang.annotations.Parameter;
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.IdentifierFactory;
-
-public class BindNSToTask implements EventHandler<TaskStart>{
-
-  private final NetworkService<?> ns;
-  private final IdentifierFactory idFac;
-  
-  @Inject
-  public BindNSToTask(
-      final NetworkService<?> ns,
-      final @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) IdentifierFactory idFac) {
-    this.ns = ns;
-    this.idFac = idFac;
-  }
-
-  @Override
-  public void onNext(final TaskStart task) {
-    this.ns.registerId(this.idFac.getNewInstance(task.getId()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/MessagingTransportFactory.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/MessagingTransportFactory.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/MessagingTransportFactory.java
deleted file mode 100644
index dcc7bb0..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/MessagingTransportFactory.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import com.microsoft.reef.io.network.TransportFactory;
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.impl.SyncStage;
-import com.microsoft.wake.remote.NetUtils;
-import com.microsoft.wake.remote.impl.TransportEvent;
-import com.microsoft.wake.remote.transport.Transport;
-import com.microsoft.wake.remote.transport.netty.NettyMessagingTransport;
-
-import javax.inject.Inject;
-
-/**
- * Factory that creates a messaging transport
- */
-public class MessagingTransportFactory implements TransportFactory {
-
-  @Inject
-  public MessagingTransportFactory() {
-  }
-
-  /**
-   * Creates a transport
-   *
-   * @param port          a listening port
-   * @param clientHandler a transport client side handler
-   * @param serverHandler a transport server side handler
-   * @param exHandler     a exception handler
-   */
-  @Override
-  public Transport create(final int port,
-                          final EventHandler<TransportEvent> clientHandler,
-                          final EventHandler<TransportEvent> serverHandler,
-                          final EventHandler<Exception> exHandler) {
-
-    final Transport transport = new NettyMessagingTransport(NetUtils.getLocalAddress(),
-        port, new SyncStage<>(clientHandler), new SyncStage<>(serverHandler), 3, 10000);
-
-    transport.registerErrorHandler(exHandler);
-    return transport;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSConnection.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSConnection.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSConnection.java
deleted file mode 100644
index 2df5995..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSConnection.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import com.microsoft.reef.exception.evaluator.NetworkException;
-import com.microsoft.reef.io.network.Connection;
-import com.microsoft.reef.io.network.naming.exception.NamingException;
-import com.microsoft.wake.Identifier;
-import com.microsoft.wake.remote.Codec;
-import com.microsoft.wake.remote.transport.Link;
-import com.microsoft.wake.remote.transport.LinkListener;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * A connection from the network service
- */
-class NSConnection<T> implements Connection<T> {
-
-  private static final Logger LOG = Logger.getLogger(NSConnection.class.getName());
-
-  private final Identifier srcId;
-  private final Identifier destId;
-  private final LinkListener<NSMessage<T>> listener;
-  private final NetworkService<T> service;
-  private final Codec<NSMessage<T>> codec;
-
-  // link can change when an endpoint physical address changes
-  private Link<NSMessage<T>> link;
-
-  /**
-   * Constructs a connection
-   *
-   * @param srcId    a source identifier
-   * @param destId   a destination identifier
-   * @param listener a link listener
-   * @param service  a network service
-   */
-  NSConnection(final Identifier srcId, final Identifier destId,
-               final LinkListener<T> listener, final NetworkService<T> service) {
-    this.srcId = srcId;
-    this.destId = destId;
-    this.listener = new NSMessageLinkListener<>(listener);
-    this.service = service;
-    this.codec = new NSMessageCodec<>(service.getCodec(), service.getIdentifierFactory());
-  }
-
-  /**
-   * Opens the connection.
-   */
-  @Override
-  public void open() throws NetworkException {
-
-    LOG.log(Level.FINE, "looking up {0}", this.destId);
-
-    try {
-      // naming lookup
-      final InetSocketAddress addr = this.service.getNameClient().lookup(this.destId);
-      if (addr == null) {
-        final NetworkException ex = new NamingException("Cannot resolve " + this.destId);
-        LOG.log(Level.WARNING, "Cannot resolve " + this.destId, ex);
-        throw ex;
-      }
-
-      LOG.log(Level.FINE, "Resolved {0} to {1}", new Object[] { this.destId, addr });
-
-      // connect to a remote address
-      this.link = this.service.getTransport().open(addr, this.codec, this.listener);
-      LOG.log(Level.FINE, "Transport returned a link {0}", this.link);
-
-    } catch (final Exception ex) {
-      LOG.log(Level.WARNING, "Could not open " + this.destId, ex);
-      throw new NetworkException(ex);
-    }
-  }
-
-  /**
-   * Writes an object to the connection
-   *
-   * @param obj an object of type T
-   * @throws a network exception
-   */
-  @Override
-  public void write(final T obj) throws NetworkException {
-    try {
-      this.link.write(new NSMessage<T>(this.srcId, this.destId, obj));
-    } catch (final IOException ex) {
-      LOG.log(Level.WARNING, "Could not write to " + this.destId, ex);
-      throw new NetworkException(ex);
-    }
-  }
-
-  /**
-   * Closes the connection and unregisters it from the service
-   */
-  @Override
-  public void close() throws NetworkException {
-    this.service.remove(this.destId);
-  }
-}
-
-/**
- * No-op link listener
- *
- * @param <T>
- */
-final class NSMessageLinkListener<T> implements LinkListener<NSMessage<T>> {
-
-  NSMessageLinkListener(final LinkListener<T> listener) {
-  }
-
-  @Override
-  public void messageReceived(final NSMessage<T> message) {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSMessage.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSMessage.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSMessage.java
deleted file mode 100644
index 63a8dc9..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSMessage.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import com.microsoft.reef.io.network.Message;
-import com.microsoft.wake.Identifier;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Network service message that implements the Message interface
- *
- * @param <T> type
- */
-public class NSMessage<T> implements Message<T> {
-  private final Identifier srcId;
-  private final Identifier destId;
-  private final List<T> data;
-
-  /**
-   * Constructs a network service message
-   *
-   * @param srcId  a source identifier
-   * @param destId a destination identifier
-   * @param data   data of type T
-   */
-  public NSMessage(final Identifier srcId, final Identifier destId, final T data) {
-    this.srcId = srcId;
-    this.destId = destId;
-    this.data = new ArrayList<T>(1);
-    this.data.add(data);
-  }
-
-  /**
-   * Constructs a network service message
-   *
-   * @param srcId  a source identifier
-   * @param destId a destination identifier
-   * @param data   a list of data of type T
-   */
-  public NSMessage(final Identifier srcId, final Identifier destId, final List<T> data) {
-    this.srcId = srcId;
-    this.destId = destId;
-    this.data = data;
-  }
-
-  /**
-   * Gets a source identifier
-   *
-   * @return an identifier
-   */
-  public Identifier getSrcId() {
-    return srcId;
-  }
-
-  /**
-   * Gets a destination identifier
-   *
-   * @return an identifier
-   */
-  public Identifier getDestId() {
-    return destId;
-  }
-
-  /**
-   * Gets data
-   *
-   * @return data
-   */
-  public List<T> getData() {
-    return data;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSMessageCodec.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSMessageCodec.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSMessageCodec.java
deleted file mode 100644
index 1cdd7a7..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSMessageCodec.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.microsoft.reef.io.network.exception.NetworkRuntimeException;
-import com.microsoft.reef.io.network.proto.ReefNetworkServiceProtos.NSMessagePBuf;
-import com.microsoft.reef.io.network.proto.ReefNetworkServiceProtos.NSRecordPBuf;
-import com.microsoft.wake.Identifier;
-import com.microsoft.wake.IdentifierFactory;
-import com.microsoft.wake.remote.Codec;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Network service message codec
- *
- * @param <T> type
- */
-public class NSMessageCodec<T> implements Codec<NSMessage<T>> {
-
-  private final Codec<T> codec;
-  private final IdentifierFactory factory;
-  private final boolean isStreamingCodec;
-
-  /**
-   * Constructs a network service message codec
-   *
-   * @param codec   a codec
-   * @param factory an identifier factory
-   */
-  public NSMessageCodec(final Codec<T> codec, final IdentifierFactory factory) {
-    this.codec = codec;
-    this.factory = factory;
-    this.isStreamingCodec = codec instanceof StreamingCodec;
-  }
-
-  /**
-   * Encodes a network service message to bytes
-   *
-   * @param obj a message
-   * @return bytes
-   */
-  @Override
-  public byte[] encode(final NSMessage<T> obj) {
-    if(isStreamingCodec) {
-      final StreamingCodec<T> streamingCodec = (StreamingCodec<T>) codec;
-      try(ByteArrayOutputStream baos = new ByteArrayOutputStream()){
-        try(DataOutputStream daos = new DataOutputStream(baos)){
-          daos.writeUTF(obj.getSrcId().toString());
-          daos.writeUTF(obj.getDestId().toString());
-          daos.writeInt(obj.getData().size());
-          for (final T rec : obj.getData()) {
-            streamingCodec.encodeToStream(rec, daos);
-          }
-        }
-        return baos.toByteArray();
-      } catch (final IOException e) {
-        throw new RuntimeException("IOException", e);
-      }
-    }
-    else {
-      final NSMessagePBuf.Builder pbuf = NSMessagePBuf.newBuilder();
-      pbuf.setSrcid(obj.getSrcId().toString());
-      pbuf.setDestid(obj.getDestId().toString());
-      for (final T rec : obj.getData()) {
-        final NSRecordPBuf.Builder rbuf = NSRecordPBuf.newBuilder();
-        rbuf.setData(ByteString.copyFrom(codec.encode(rec)));
-        pbuf.addMsgs(rbuf);
-      }
-      return pbuf.build().toByteArray();
-    }
-  }
-
-  /**
-   * Decodes a network service message from bytes
-   *
-   * @param buf bytes
-   * @return a message
-   */
-  @Override
-  public NSMessage<T> decode(final byte[] buf) {
-    if (isStreamingCodec) {
-      final StreamingCodec<T> streamingCodec = (StreamingCodec<T>) codec;
-      try(ByteArrayInputStream bais = new ByteArrayInputStream(buf)){
-        try(DataInputStream dais = new DataInputStream(bais)){
-          final Identifier srcId = factory.getNewInstance(dais.readUTF());
-          final Identifier destId = factory.getNewInstance(dais.readUTF());
-          final int size = dais.readInt();
-          final List<T> list = new ArrayList<T>(size);
-          for(int i=0;i<size;i++) {
-            list.add(streamingCodec.decodeFromStream(dais));
-          }
-          return new NSMessage<>(srcId, destId, list);
-        }
-      } catch (final IOException e) {
-        throw new RuntimeException("IOException", e);
-      }
-    } else {
-      NSMessagePBuf pbuf;
-      try {
-        pbuf = NSMessagePBuf.parseFrom(buf);
-      } catch (final InvalidProtocolBufferException e) {
-        e.printStackTrace();
-        throw new NetworkRuntimeException(e);
-      }
-      final List<T> list = new ArrayList<T>();
-      for (final NSRecordPBuf rbuf : pbuf.getMsgsList()) {
-        list.add(codec.decode(rbuf.getData().toByteArray()));
-      }
-      return new NSMessage<T>(factory.getNewInstance(pbuf.getSrcid()), factory.getNewInstance(pbuf.getDestid()), list);
-    }
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NameServiceCloseHandler.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NameServiceCloseHandler.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NameServiceCloseHandler.java
deleted file mode 100644
index f5804ec..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NameServiceCloseHandler.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import com.microsoft.reef.evaluator.context.events.ContextStop;
-import com.microsoft.reef.io.network.naming.NameServer;
-import com.microsoft.wake.EventHandler;
-
-import javax.inject.Inject;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public final class NameServiceCloseHandler implements EventHandler<ContextStop> {
-
-  private static final Logger LOG = Logger.getLogger(NameServiceCloseHandler.class.getName());
-
-  private final AutoCloseable toClose;
-
-  @Inject
-  public NameServiceCloseHandler(final NameServer toClose) {
-    this.toClose = toClose;
-  }
-
-  @Override
-  public void onNext(final ContextStop event) {
-    try {
-      LOG.log(Level.FINEST, "Closing {0}", this.toClose);
-      this.toClose.close();
-    } catch (final Throwable ex) {
-      LOG.log(Level.SEVERE, "Exception while closing " + this.toClose, ex);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkService.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkService.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkService.java
deleted file mode 100644
index c86a7cf..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkService.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import com.microsoft.reef.io.Tuple;
-import com.microsoft.reef.io.naming.Naming;
-import com.microsoft.reef.io.network.Connection;
-import com.microsoft.reef.io.network.ConnectionFactory;
-import com.microsoft.reef.io.network.Message;
-import com.microsoft.reef.io.network.TransportFactory;
-import com.microsoft.reef.io.network.naming.NameCache;
-import com.microsoft.reef.io.network.naming.NameClient;
-import com.microsoft.reef.io.network.naming.NameLookupClient;
-import com.microsoft.reef.io.network.naming.NameServerParameters;
-import com.microsoft.tang.Injector;
-import com.microsoft.tang.Tang;
-import com.microsoft.tang.annotations.Parameter;
-import com.microsoft.tang.exceptions.InjectionException;
-import com.microsoft.wake.*;
-import com.microsoft.wake.impl.LoggingEventHandler;
-import com.microsoft.wake.impl.SingleThreadStage;
-import com.microsoft.wake.remote.Codec;
-import com.microsoft.wake.remote.impl.TransportEvent;
-import com.microsoft.wake.remote.transport.LinkListener;
-import com.microsoft.wake.remote.transport.Transport;
-
-import javax.inject.Inject;
-import java.net.InetSocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Network service for Task
- */
-public final class NetworkService<T> implements Stage, ConnectionFactory<T> {
-
-  private static final Logger LOG = Logger.getLogger(NetworkService.class.getName());
-
-  private static final int retryCount;
-  private static final int retryTimeout;
-
-  static {
-    try {
-      final Injector injector = Tang.Factory.getTang().newInjector();
-      retryCount = injector.getNamedInstance(NameLookupClient.RetryCount.class);
-      retryTimeout = injector.getNamedInstance(NameLookupClient.RetryTimeout.class);
-    } catch (final InjectionException ex) {
-      final String msg = "Exception while trying to find default values for retryCount & Timeout";
-      LOG.log(Level.SEVERE, msg, ex);
-      throw new RuntimeException(msg, ex);
-    }
-  }
-
-  private Identifier myId;
-  private final IdentifierFactory factory;
-  private final Codec<T> codec;
-  private final Transport transport;
-  private final NameClient nameClient;
-
-  private final ConcurrentMap<Identifier, Connection<T>> idToConnMap = new ConcurrentHashMap<>();
-
-  private final EStage<Tuple<Identifier, InetSocketAddress>> nameServiceRegisteringStage;
-  private final EStage<Identifier> nameServiceUnregisteringStage;
-
-  public NetworkService(final IdentifierFactory factory,
-                        final int nsPort,
-                        final String nameServerAddr,
-                        final int nameServerPort,
-                        final Codec<T> codec,
-                        final TransportFactory tpFactory,
-                        final EventHandler<Message<T>> recvHandler,
-                        final EventHandler<Exception> exHandler) {
-    this(factory, nsPort, nameServerAddr, nameServerPort,
-        retryCount, retryTimeout, codec, tpFactory, recvHandler, exHandler);
-  }
-
-  @Inject
-  public NetworkService(
-      final @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) IdentifierFactory factory,
-      final @Parameter(NetworkServiceParameters.NetworkServicePort.class) int nsPort,
-      final @Parameter(NameServerParameters.NameServerAddr.class) String nameServerAddr,
-      final @Parameter(NameServerParameters.NameServerPort.class) int nameServerPort,
-      final @Parameter(NameLookupClient.RetryCount.class) int retryCount,
-      final @Parameter(NameLookupClient.RetryTimeout.class) int retryTimeout,
-      final @Parameter(NetworkServiceParameters.NetworkServiceCodec.class) Codec<T> codec,
-      final @Parameter(NetworkServiceParameters.NetworkServiceTransportFactory.class) TransportFactory tpFactory,
-      final @Parameter(NetworkServiceParameters.NetworkServiceHandler.class) EventHandler<Message<T>> recvHandler,
-      final @Parameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class) EventHandler<Exception> exHandler) {
-
-    this.factory = factory;
-    this.codec = codec;
-    this.transport = tpFactory.create(nsPort,
-        new LoggingEventHandler<TransportEvent>(),
-        new MessageHandler<T>(recvHandler, codec, factory), exHandler);
-
-    this.nameClient = new NameClient(nameServerAddr, nameServerPort,
-        factory, retryCount, retryTimeout, new NameCache(30000));
-
-    this.nameServiceRegisteringStage = new SingleThreadStage<>(
-        "NameServiceRegisterer", new EventHandler<Tuple<Identifier, InetSocketAddress>>() {
-      @Override
-      public void onNext(final Tuple<Identifier, InetSocketAddress> tuple) {
-        try {
-          nameClient.register(tuple.getKey(), tuple.getValue());
-          LOG.log(Level.FINEST, "Registered {0} with nameservice", tuple.getKey());
-        } catch (final Exception ex) {
-          final String msg = "Unable to register " + tuple.getKey() + "with name service";
-          LOG.log(Level.WARNING, msg, ex);
-          throw new RuntimeException(msg, ex);
-        }
-      }
-    }, 5);
-
-    this.nameServiceUnregisteringStage = new SingleThreadStage<>(
-        "NameServiceRegisterer", new EventHandler<Identifier>() {
-      @Override
-      public void onNext(final Identifier id) {
-        try {
-          nameClient.unregister(id);
-          LOG.log(Level.FINEST, "Unregistered {0} with nameservice", id);
-        } catch (final Exception ex) {
-          final String msg = "Unable to unregister " + id + " with name service";
-          LOG.log(Level.WARNING, msg, ex);
-          throw new RuntimeException(msg, ex);
-        }
-      }
-    }, 5);
-  }
-
-  public void registerId(final Identifier id) {
-    this.myId = id;
-    final Tuple<Identifier, InetSocketAddress> tuple =
-        new Tuple<>(id, (InetSocketAddress) this.transport.getLocalAddress());
-    LOG.log(Level.FINEST, "Binding {0} to NetworkService@({1})",
-        new Object[]{tuple.getKey(), tuple.getValue()});
-    this.nameServiceRegisteringStage.onNext(tuple);
-  }
-
-  public void unregisterId(Identifier id) {
-    this.myId = null;
-    LOG.log(Level.FINEST, "Unbinding {0} to NetworkService@({1})",
-        new Object[]{id, this.transport.getLocalAddress()});
-    this.nameServiceUnregisteringStage.onNext(id);
-  }
-
-  public Identifier getMyId() {
-    return this.myId;
-  }
-
-  public Transport getTransport() {
-    return this.transport;
-  }
-
-  public Codec<T> getCodec() {
-    return this.codec;
-  }
-
-  public Naming getNameClient() {
-    return this.nameClient;
-  }
-
-  public IdentifierFactory getIdentifierFactory() {
-    return this.factory;
-  }
-
-  void remove(final Identifier id) {
-    this.idToConnMap.remove(id);
-  }
-
-  @Override
-  public void close() throws Exception {
-    LOG.log(Level.FINE, "Shutting down");
-    this.transport.close();
-    this.nameClient.close();
-  }
-
-  @Override
-  public Connection<T> newConnection(final Identifier destId) {
-
-    if (this.myId == null) {
-      throw new RuntimeException(
-          "Trying to establish a connection from a Network Service that is not bound to any task");
-    }
-
-    final Connection<T> conn = this.idToConnMap.get(destId);
-    if (conn != null) {
-      return conn;
-    }
-
-    final Connection<T> newConnection = new NSConnection<T>(
-        this.myId, destId, new LinkListener<T>() {
-      @Override
-      public void messageReceived(final Object message) {
-      }
-    }, this);
-
-    final Connection<T> existing = this.idToConnMap.putIfAbsent(destId, newConnection);
-    return existing == null ? newConnection : existing;
-  }
-}
-
-class MessageHandler<T> implements EventHandler<TransportEvent> {
-
-  private final EventHandler<Message<T>> handler;
-  private final NSMessageCodec<T> codec;
-
-  public MessageHandler(final EventHandler<Message<T>> handler,
-                        final Codec<T> codec, final IdentifierFactory factory) {
-    this.handler = handler;
-    this.codec = new NSMessageCodec<T>(codec, factory);
-  }
-
-  @Override
-  public void onNext(final TransportEvent value) {
-    final byte[] data = value.getData();
-    final NSMessage<T> obj = this.codec.decode(data);
-    this.handler.onNext(obj);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkServiceClosingHandler.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkServiceClosingHandler.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkServiceClosingHandler.java
deleted file mode 100644
index 9be5d14..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkServiceClosingHandler.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import com.microsoft.reef.evaluator.context.events.ContextStop;
-import com.microsoft.wake.EventHandler;
-
-import javax.inject.Inject;
-
-public class NetworkServiceClosingHandler implements EventHandler<ContextStop> {
-  private final NetworkService<?> networkService;
-
-  @Inject
-  public NetworkServiceClosingHandler(final NetworkService<?> networkService) {
-    this.networkService = networkService;
-  }
-
-  @Override
-  public void onNext(ContextStop arg0) {
-    try {
-      networkService.close();
-    } catch (Exception e) {
-      throw new RuntimeException("Exception while closing NetworkService", e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkServiceParameters.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkServiceParameters.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkServiceParameters.java
deleted file mode 100644
index 69488aa..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkServiceParameters.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import com.microsoft.reef.io.network.TransportFactory;
-import com.microsoft.reef.io.network.util.StringIdentifierFactory;
-import com.microsoft.tang.annotations.Name;
-import com.microsoft.tang.annotations.NamedParameter;
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.IdentifierFactory;
-import com.microsoft.wake.remote.Codec;
-
-public class NetworkServiceParameters {
-  
-  @NamedParameter
-  public static class TaskId implements Name<String> {
-
-  }
-
-  @NamedParameter(doc = "identifier factory for the service", short_name = "factory", default_class = StringIdentifierFactory.class)
-  public static class NetworkServiceIdentifierFactory implements Name<IdentifierFactory> {
-  }
-
-  @NamedParameter(doc = "port for the network service", short_name = "nsport", default_value = "7070")
-  public static class NetworkServicePort implements Name<Integer> {
-  }
-
-  @NamedParameter(doc = "codec for the network service", short_name = "nscodec")
-  public static class NetworkServiceCodec implements Name<Codec<?>> {
-  }
-
-  @NamedParameter(doc = "transport factory for the network service", short_name = "nstransportfactory", default_class = MessagingTransportFactory.class)
-  public static class NetworkServiceTransportFactory implements Name<TransportFactory> {
-  }
-
-  @NamedParameter(doc = "network receive handler for the network service", short_name = "nshandler")
-  public static class NetworkServiceHandler implements Name<EventHandler<?>> {
-  }
-
-  @NamedParameter(doc = "network exception handler for the network service", short_name = "exhandler")
-  public static class NetworkServiceExceptionHandler implements Name<EventHandler<?>> {
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/StreamingCodec.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/StreamingCodec.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/StreamingCodec.java
deleted file mode 100644
index 34e9a85..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/StreamingCodec.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2013 Microsoft.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-
-import com.microsoft.wake.remote.Codec;
-
-/**
- * A codec that can make serialization more efficient when an object has to be
- * codec'ed through a chain of codecs
- */
-public interface StreamingCodec<T> extends Codec<T> {
-
-  void encodeToStream (T obj, DataOutputStream stream);
-
-  T decodeFromStream (DataInputStream stream);
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/UnbindNSFromTask.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/UnbindNSFromTask.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/UnbindNSFromTask.java
deleted file mode 100644
index 14265f6..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/UnbindNSFromTask.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import javax.inject.Inject;
-
-import com.microsoft.reef.task.events.TaskStop;
-import com.microsoft.tang.annotations.Parameter;
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.IdentifierFactory;
-
-public class UnbindNSFromTask implements EventHandler<TaskStop> {
-
-  private final NetworkService<?> ns;
-  private final IdentifierFactory idFac;
-  
-  @Inject
-  public UnbindNSFromTask(
-      final NetworkService<?> ns,
-      final @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) IdentifierFactory idFac) {
-    this.ns = ns;
-    this.idFac = idFac;
-  }
-
-  @Override
-  public void onNext(final TaskStop task) {
-    this.ns.unregisterId(this.idFac.getNewInstance(task.getId()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/package-info.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/package-info.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/package-info.java
deleted file mode 100644
index 806b25a..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/package-info.java
+++ /dev/null
@@ -1,16 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameAssignmentTuple.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameAssignmentTuple.java b/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameAssignmentTuple.java
deleted file mode 100644
index 34fd260..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameAssignmentTuple.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.naming;
-
-import com.microsoft.reef.io.naming.NameAssignment;
-import com.microsoft.wake.Identifier;
-
-import java.net.InetSocketAddress;
-
-/**
- * An implementation of the NameAssignment interface
- */
-public class NameAssignmentTuple implements NameAssignment {
-
-  private final Identifier id;
-  private final InetSocketAddress addr;
-  
-  /**
-   * Constructs a name assignment tuple
-   * 
-   * @param id an identifier
-   * @param addr an Internet socket address
-   */
-  public NameAssignmentTuple(Identifier id, InetSocketAddress addr) {
-    this.id = id;
-    this.addr = addr;
-  }
-  
-  /**
-   * Gets an identifier
-   * 
-   * @return an identifier
-   */
-  @Override
-  public Identifier getIdentifier() {
-    return id;
-  }
-
-  /**
-   * Gets an address
-   * 
-   * @return an Internet socket address
-   */
-  @Override
-  public InetSocketAddress getAddress() {
-    return addr;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameCache.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameCache.java b/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameCache.java
deleted file mode 100644
index 2a4fa52..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameCache.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.naming;
-
-import com.google.common.cache.CacheBuilder;
-import com.microsoft.reef.io.network.Cache;
-import com.microsoft.wake.Identifier;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Naming cache implementation
- */
-public class NameCache implements Cache<Identifier, InetSocketAddress> {
-
-  private final com.google.common.cache.Cache<Identifier, InetSocketAddress> cache;
-
-  /**
-   * Constructs a naming cache
-   *
-   * @param timeout a cache entry timeout after access
-   */
-  public NameCache(long timeout) {
-    cache = CacheBuilder.newBuilder()
-        .expireAfterWrite(timeout, TimeUnit.MILLISECONDS)
-        .build();
-  }
-
-  /**
-   * Gets an address for an identifier
-   *
-   * @param key          an identifier
-   * @param valueFetcher a callable to load a value for the corresponding identifier
-   * @return an Internet socket address
-   * @throws ExecutionException
-   */
-  @Override
-  public InetSocketAddress get(Identifier key,
-                               Callable<InetSocketAddress> valueFetcher) throws ExecutionException {
-    return cache.get(key, valueFetcher);
-  }
-
-  /**
-   * Invalidates the entry for an identifier
-   *
-   * @param key an identifier
-   */
-  @Override
-  public void invalidate(Identifier key) {
-    cache.invalidate(key);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameClient.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameClient.java b/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameClient.java
deleted file mode 100644
index 84af42c..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameClient.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.naming;
-
-import com.microsoft.reef.io.naming.Naming;
-import com.microsoft.reef.io.network.Cache;
-import com.microsoft.reef.io.network.naming.exception.NamingRuntimeException;
-import com.microsoft.reef.io.network.naming.serialization.NamingLookupResponse;
-import com.microsoft.reef.io.network.naming.serialization.NamingMessage;
-import com.microsoft.reef.io.network.naming.serialization.NamingRegisterResponse;
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.Identifier;
-import com.microsoft.wake.IdentifierFactory;
-import com.microsoft.wake.Stage;
-import com.microsoft.wake.impl.SyncStage;
-import com.microsoft.wake.remote.Codec;
-import com.microsoft.wake.remote.NetUtils;
-import com.microsoft.wake.remote.impl.TransportEvent;
-import com.microsoft.wake.remote.transport.Transport;
-import com.microsoft.wake.remote.transport.netty.NettyMessagingTransport;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Naming client
- */
-public class NameClient implements Stage, Naming {
-  private static final Logger LOG = Logger.getLogger(NameClient.class.getName());
-
-  private NameLookupClient lookupClient;
-  private NameRegistryClient registryClient;
-  private Transport transport;
-
-  /**
-   * Constructs a naming client
-   *
-   * @param serverAddr a server address
-   * @param serverPort a server port number
-   * @param factory an identifier factory
-   * @param cache a cache
-   */
-  public NameClient(String serverAddr, int serverPort,
-      IdentifierFactory factory, int retryCount, int retryTimeout,
-      Cache<Identifier, InetSocketAddress> cache) {
-    this(serverAddr, serverPort, 10000, factory, retryCount, retryTimeout, cache);
-  }
-
-  /**
-   * Constructs a naming client
-   *
-   * @param serverAddr a server address
-   * @param serverPort a server port number
-   * @param timeout timeout in ms
-   * @param factory an identifier factory
-   * @param cache a cache
-   */
-  public NameClient(final String serverAddr, final int serverPort, final long timeout,
-      final IdentifierFactory factory, final int retryCount, final int retryTimeout,
-      final Cache<Identifier, InetSocketAddress> cache) {
-
-    final BlockingQueue<NamingLookupResponse> replyLookupQueue = new LinkedBlockingQueue<NamingLookupResponse>();
-    final BlockingQueue<NamingRegisterResponse> replyRegisterQueue = new LinkedBlockingQueue<NamingRegisterResponse>();
-    final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
-
-    this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), 0,
-        new SyncStage<>(new NamingClientEventHandler(
-            new NamingResponseHandler(replyLookupQueue, replyRegisterQueue), codec)),
-        null, retryCount, retryTimeout);
-
-    this.lookupClient = new NameLookupClient(serverAddr, serverPort, timeout,
-        factory, retryCount, retryTimeout, replyLookupQueue, this.transport, cache);
-
-    this.registryClient = new NameRegistryClient(serverAddr, serverPort, timeout,
-        factory, replyRegisterQueue, this.transport);
-  }
-
-  /**
-   * Registers an (identifier, address) mapping
-   *
-   * @param id an identifier
-   * @param addr an Internet socket address
-   */
-  @Override
-  public void register(final Identifier id, final InetSocketAddress addr)
-      throws Exception {
-    LOG.log(Level.FINE, "Refister {0} : {1}", new Object[] { id, addr });
-    this.registryClient.register(id, addr);
-  }
-
-  /**
-   * Unregisters an identifier
-   *
-   * @param id an identifier
-   */
-  @Override
-  public void unregister(final Identifier id) throws IOException {
-    this.registryClient.unregister(id);
-  }
-
-  /**
-   * Finds an address for an identifier
-   *
-   * @param id an identifier
-   * @return an Internet socket address
-   */
-  @Override
-  public InetSocketAddress lookup(final Identifier id) throws Exception {
-    return this.lookupClient.lookup(id);
-  }
-
-  /**
-   * Retrieves an address for an identifier remotely
-   *
-   * @param id an identifier
-   * @return an Internet socket address
-   * @throws Exception
-   */
-  public InetSocketAddress remoteLookup(final Identifier id) throws Exception {
-    return this.lookupClient.remoteLookup(id);
-  }
-
-  /**
-   * Closes resources
-   */
-  @Override
-  public void close() throws Exception {
-
-    if (this.lookupClient != null) {
-      this.lookupClient.close();
-    }
-
-    if (this.registryClient != null) {
-      this.registryClient.close();
-    }
-
-    if (this.transport != null) {
-      this.transport.close();
-    }
-  }
-}
-
-/**
- * Naming client transport event handler
- */
-class NamingClientEventHandler implements EventHandler<TransportEvent> {
-
-  private static final Logger LOG = Logger.getLogger(NamingClientEventHandler.class.getName());
-
-  private final EventHandler<NamingMessage> handler;
-  private final Codec<NamingMessage> codec;
-
-  public NamingClientEventHandler(
-      final EventHandler<NamingMessage> handler, final Codec<NamingMessage> codec) {
-    this.handler = handler;
-    this.codec = codec;
-  }
-
-  @Override
-  public void onNext(final TransportEvent value) {
-    LOG.log(Level.FINE, "Transport: ", value);
-    this.handler.onNext(this.codec.decode(value.getData()));
-  }
-}
-
-/**
- * Naming response message handler
- */
-class NamingResponseHandler implements EventHandler<NamingMessage> {
-
-  private final BlockingQueue<NamingLookupResponse> replyLookupQueue;
-  private final BlockingQueue<NamingRegisterResponse> replyRegisterQueue;
-
-  NamingResponseHandler(BlockingQueue<NamingLookupResponse> replyLookupQueue,
-      BlockingQueue<NamingRegisterResponse> replyRegisterQueue) {
-    this.replyLookupQueue = replyLookupQueue;
-    this.replyRegisterQueue = replyRegisterQueue;
-  }
-
-  @Override
-  public void onNext(NamingMessage value) {
-    if (value instanceof NamingLookupResponse) {
-      replyLookupQueue.offer((NamingLookupResponse)value);
-    } else if (value instanceof NamingRegisterResponse) {
-      replyRegisterQueue.offer((NamingRegisterResponse)value);
-    } else {
-      throw new NamingRuntimeException("Unknown naming response message");
-    }
-
-  }
-
-}