You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by bg...@apache.org on 2015/05/30 18:41:51 UTC

incubator-reef git commit: [REEF-303] Clean up the construction of Wake Transport implementation This adds TransportFactory that returns a new Transport instance instantiated by Tang and cleans up deprecated methods/annotations.

Repository: incubator-reef
Updated Branches:
  refs/heads/master 8c4456415 -> 0f632ca86


[REEF-303] Clean up the construction of Wake Transport implementation
  This adds TransportFactory that returns a new Transport instance
  instantiated by Tang and cleans up deprecated methods/annotations.

JIRA:
  [REEF-303] https://issues.apache.org/jira/browse/REEF-303

Pull Request:
  Closes #195


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/0f632ca8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/0f632ca8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/0f632ca8

Branch: refs/heads/master
Commit: 0f632ca864c8356e22981f598d31c89cfd7c4a94
Parents: 8c44564
Author: taegeonum <ta...@gmail.com>
Authored: Thu May 28 19:06:33 2015 +0900
Committer: Byung-Gon Chun <bg...@apache.org>
Committed: Sun May 31 01:36:54 2015 +0900

----------------------------------------------------------------------
 .../apache/reef/examples/suspend/Control.java   |   9 +-
 .../reef/examples/suspend/SuspendClient.java    |  12 +-
 .../examples/suspend/SuspendClientControl.java  |   8 +-
 .../reef/io/network/TransportFactory.java       |  43 -------
 .../group/impl/driver/GroupCommDriverImpl.java  |  25 +++-
 .../network/impl/MessagingTransportFactory.java |  77 ------------
 .../reef/io/network/impl/NetworkService.java    |  25 ++--
 .../network/impl/NetworkServiceParameters.java  |   3 +-
 .../reef/io/network/naming/NameClient.java      |  30 ++++-
 .../io/network/naming/NameLookupClient.java     |  34 ++++-
 .../io/network/naming/NameRegistryClient.java   |  18 ++-
 .../reef/io/network/naming/NameServerImpl.java  |  54 ++++++--
 .../services/network/NetworkServiceTest.java    |   3 +-
 .../reef/wake/remote/RemoteConfiguration.java   |  13 ++
 .../impl/DefaultRemoteManagerFactory.java       |  17 +--
 .../DefaultRemoteManagerImplementation.java     |  17 ++-
 .../remote/impl/DefaultTransportEStage.java     |  38 ++++++
 .../wake/remote/transport/TransportFactory.java |  86 +++++++++++++
 .../netty/MessagingTransportFactory.java        | 124 +++++++++++++++++++
 .../netty/NettyMessagingTransport.java          |  60 +++++++--
 .../reef/wake/test/remote/LargeMsgTest.java     |  12 +-
 .../reef/wake/test/remote/RemoteTest.java       |  24 ++--
 .../wake/test/remote/SmallMessagesTest.java     |  15 ++-
 .../wake/test/remote/TransportRaceTest.java     |  16 ++-
 .../reef/wake/test/remote/TransportTest.java    |  17 ++-
 25 files changed, 549 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Control.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Control.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Control.java
index db312ce..c5cf879 100644
--- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Control.java
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Control.java
@@ -35,7 +35,7 @@ import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
 import org.apache.reef.wake.remote.impl.TransportEvent;
 import org.apache.reef.wake.remote.transport.Link;
 import org.apache.reef.wake.remote.transport.Transport;
-import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+import org.apache.reef.wake.remote.transport.TransportFactory;
 
 import javax.inject.Inject;
 import java.io.IOException;
@@ -49,14 +49,17 @@ public final class Control {
   private final transient String command;
   private final transient String taskId;
   private final transient int port;
+  private final TransportFactory tpFactory;
 
   @Inject
   public Control(@Parameter(SuspendClientControl.Port.class) final int port,
                  @Parameter(TaskId.class) final String taskId,
-                 @Parameter(Command.class) final String command) {
+                 @Parameter(Command.class) final String command,
+                 final TransportFactory tpFactory) {
     this.command = command.trim().toLowerCase();
     this.taskId = taskId;
     this.port = port;
+    this.tpFactory = tpFactory;
   }
 
   private static Configuration getConfig(final String[] args) throws IOException, BindException {
@@ -87,7 +90,7 @@ public final class Control {
       }
     });
 
-    try (final Transport transport = new NettyMessagingTransport("localhost", 0, stage, stage, 1, 10000)) {
+    try (final Transport transport = tpFactory.newInstance("localhost", 0, stage, stage, 1, 10000)) {
       final Link link = transport.open(new InetSocketAddress("localhost", this.port), codec, null);
       link.write(this.command + " " + this.taskId);
     }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClient.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClient.java
index e57e9b5..f7dbb59 100644
--- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClient.java
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClient.java
@@ -57,15 +57,15 @@ public class SuspendClient {
   private final SuspendClientControl controlListener;
 
   /**
-   * @param reef      reference to the REEF framework.
-   * @param port      port to listen to for suspend/resume commands.
-   * @param numCycles number of cycles to run in the task.
-   * @param delay     delay in seconds between cycles in the task.
+   * @param reef                 reference to the REEF framework.
+   * @param controlListener      suspend client control listener.
+   * @param numCycles            number of cycles to run in the task.
+   * @param delay                delay in seconds between cycles in the task.
    */
   @Inject
   SuspendClient(
       final REEF reef,
-      final @Parameter(SuspendClientControl.Port.class) int port,
+      final SuspendClientControl controlListener,
       final @Parameter(Launch.NumCycles.class) int numCycles,
       final @Parameter(Launch.Delay.class) int delay) throws BindException, IOException {
 
@@ -89,7 +89,7 @@ public class SuspendClient {
 
     this.driverConfig = cb.build();
     this.reef = reef;
-    this.controlListener = new SuspendClientControl(port);
+    this.controlListener = controlListener;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClientControl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClientControl.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClientControl.java
index accb5a8..7f5e67f 100644
--- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClientControl.java
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClientControl.java
@@ -28,7 +28,7 @@ import org.apache.reef.wake.impl.ThreadPoolStage;
 import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
 import org.apache.reef.wake.remote.impl.TransportEvent;
 import org.apache.reef.wake.remote.transport.Transport;
-import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+import org.apache.reef.wake.remote.transport.TransportFactory;
 
 import javax.inject.Inject;
 import java.io.IOException;
@@ -45,9 +45,11 @@ public class SuspendClientControl implements AutoCloseable {
   private final transient Transport transport;
   private transient RunningJob runningJob;
 
+
   @Inject
   public SuspendClientControl(
-      final @Parameter(SuspendClientControl.Port.class) int port) throws IOException {
+      final @Parameter(SuspendClientControl.Port.class) int port,
+      final TransportFactory tpFactory) throws IOException {
 
     LOG.log(Level.INFO, "Listen to control port {0}", port);
 
@@ -59,7 +61,7 @@ public class SuspendClientControl implements AutoCloseable {
       }
     });
 
-    this.transport = new NettyMessagingTransport("localhost", port, stage, stage, 1, 10000);
+    this.transport = tpFactory.newInstance("localhost", port, stage, stage, 1, 10000);
   }
 
   public synchronized void setRunningJob(final RunningJob job) {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-io/src/main/java/org/apache/reef/io/network/TransportFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/TransportFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/TransportFactory.java
deleted file mode 100644
index 1443a46..0000000
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/TransportFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.io.network;
-
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.remote.impl.TransportEvent;
-import org.apache.reef.wake.remote.transport.Transport;
-
-/**
- * Factory that creates a transport
- */
-public interface TransportFactory {
-
-  /**
-   * Creates a transport
-   *
-   * @param port          a listening port
-   * @param clientHandler a transport client-side handler
-   * @param serverHandler a transport server-side handler
-   * @param exHandler     an exception handler
-   * @return
-   */
-  public Transport create(int port,
-                          EventHandler<TransportEvent> clientHandler,
-                          EventHandler<TransportEvent> serverHandler,
-                          EventHandler<Exception> exHandler);
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
index adcb1e5..e453640 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
@@ -26,10 +26,6 @@ import org.apache.reef.driver.parameters.DriverIdentifier;
 import org.apache.reef.driver.task.FailedTask;
 import org.apache.reef.driver.task.RunningTask;
 import org.apache.reef.io.network.Message;
-import org.apache.reef.io.network.impl.*;
-import org.apache.reef.io.network.naming.NameServer;
-import org.apache.reef.io.network.naming.NameServerImpl;
-import org.apache.reef.io.network.naming.NameServerParameters;
 import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver;
 import org.apache.reef.io.network.group.api.driver.GroupCommServiceDriver;
 import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
@@ -39,6 +35,10 @@ import org.apache.reef.io.network.group.impl.config.parameters.TreeTopologyFanOu
 import org.apache.reef.io.network.group.impl.task.GroupCommNetworkHandlerImpl;
 import org.apache.reef.io.network.group.impl.utils.BroadcastingEventHandler;
 import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.impl.*;
+import org.apache.reef.io.network.naming.NameServer;
+import org.apache.reef.io.network.naming.NameServerImpl;
+import org.apache.reef.io.network.naming.NameServerParameters;
 import org.apache.reef.io.network.util.StringIdentifierFactory;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.JavaConfigurationBuilder;
@@ -56,6 +56,8 @@ import org.apache.reef.wake.impl.SyncStage;
 import org.apache.reef.wake.impl.ThreadPoolStage;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
 import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
+import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory;
+import org.apache.reef.wake.remote.transport.TransportFactory;
 
 import javax.inject.Inject;
 import java.util.HashMap;
@@ -124,6 +126,19 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver {
                              @Parameter(DriverIdentifier.class) final String driverId,
                              @Parameter(TreeTopologyFanOut.class) final int fanOut,
                              final LocalAddressProvider localAddressProvider) {
+    this(confSerializer, driverId, fanOut, localAddressProvider, new MessagingTransportFactory());
+  }
+
+  /**
+   * @deprecated Have an instance injected instead.
+   */
+  @Deprecated
+  @Inject
+  public GroupCommDriverImpl(final ConfigurationSerializer confSerializer,
+                             @Parameter(DriverIdentifier.class) final String driverId,
+                             @Parameter(TreeTopologyFanOut.class) final int fanOut,
+                             final LocalAddressProvider localAddressProvider,
+                             final TransportFactory tpFactory) {
     assert (SingletonAsserter.assertSingleton(getClass()));
     this.driverId = driverId;
     this.fanOut = fanOut;
@@ -141,7 +156,7 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver {
     this.groupCommMessageHandler = new GroupCommMessageHandler();
     this.groupCommMessageStage = new SingleThreadStage<>("GroupCommMessageStage", groupCommMessageHandler, 100 * 1000);
     this.netService = new NetworkService<>(idFac, 0, nameServiceAddr, nameServicePort,
-        new GroupCommunicationMessageCodec(), new MessagingTransportFactory(localAddressProvider),
+        new GroupCommunicationMessageCodec(), tpFactory,
         new EventHandler<Message<GroupCommunicationMessage>>() {
 
           @Override

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java
deleted file mode 100644
index f9391bd..0000000
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.io.network.impl;
-
-import org.apache.reef.io.network.TransportFactory;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.impl.SyncStage;
-import org.apache.reef.wake.remote.address.LocalAddressProvider;
-import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
-import org.apache.reef.wake.remote.impl.TransportEvent;
-import org.apache.reef.wake.remote.ports.RangeTcpPortProvider;
-import org.apache.reef.wake.remote.transport.Transport;
-import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
-
-import javax.inject.Inject;
-
-/**
- * Factory that creates a messaging transport
- */
-public class MessagingTransportFactory implements TransportFactory {
-
-  private final String localAddress;
-
-  /**
-   * @deprecated Have an instance injected instead.
-   */
-  @Deprecated
-  @Inject
-  public MessagingTransportFactory(final LocalAddressProvider localAddressProvider) {
-    this.localAddress = localAddressProvider.getLocalAddress();
-  }
-
-  /**
-   * @deprecated Have an instance injected instead.
-   */
-  @Deprecated
-  public MessagingTransportFactory() {
-    this.localAddress = LocalAddressProviderFactory.getInstance().getLocalAddress();
-  }
-
-  /**
-   * Creates a transport
-   *
-   * @param port          a listening port
-   * @param clientHandler a transport client side handler
-   * @param serverHandler a transport server side handler
-   * @param exHandler     a exception handler
-   */
-  @Override
-  public Transport create(final int port,
-                          final EventHandler<TransportEvent> clientHandler,
-                          final EventHandler<TransportEvent> serverHandler,
-                          final EventHandler<Exception> exHandler) {
-
-    final Transport transport = new NettyMessagingTransport(this.localAddress,
-        port, new SyncStage<>(clientHandler), new SyncStage<>(serverHandler), 3, 10000, RangeTcpPortProvider.Default);
-
-    transport.registerErrorHandler(exHandler);
-    return transport;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
index 269fbf3..3709f43 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
@@ -23,7 +23,7 @@ import org.apache.reef.io.naming.Naming;
 import org.apache.reef.io.network.Connection;
 import org.apache.reef.io.network.ConnectionFactory;
 import org.apache.reef.io.network.Message;
-import org.apache.reef.io.network.TransportFactory;
+import org.apache.reef.wake.remote.transport.TransportFactory;
 import org.apache.reef.io.network.naming.NameCache;
 import org.apache.reef.io.network.naming.NameClient;
 import org.apache.reef.io.network.naming.NameLookupClient;
@@ -125,18 +125,17 @@ public final class NetworkService<T> implements Stage, ConnectionFactory<T> {
    * @deprecated have an instance injected instead.
    */
   @Deprecated
-  @Inject
   public NetworkService(
-      final @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) IdentifierFactory factory,
-      final @Parameter(NetworkServiceParameters.NetworkServicePort.class) int nsPort,
-      final @Parameter(NameServerParameters.NameServerAddr.class) String nameServerAddr,
-      final @Parameter(NameServerParameters.NameServerPort.class) int nameServerPort,
-      final @Parameter(NameLookupClient.RetryCount.class) int retryCount,
-      final @Parameter(NameLookupClient.RetryTimeout.class) int retryTimeout,
-      final @Parameter(NetworkServiceParameters.NetworkServiceCodec.class) Codec<T> codec,
-      final @Parameter(NetworkServiceParameters.NetworkServiceTransportFactory.class) TransportFactory tpFactory,
-      final @Parameter(NetworkServiceParameters.NetworkServiceHandler.class) EventHandler<Message<T>> recvHandler,
-      final @Parameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class) EventHandler<Exception> exHandler) {
+      final IdentifierFactory factory,
+      final int nsPort,
+      final String nameServerAddr,
+      final int nameServerPort,
+      final int retryCount,
+      final int retryTimeout,
+      final Codec<T> codec,
+      final TransportFactory tpFactory,
+      final EventHandler<Message<T>> recvHandler,
+      final EventHandler<Exception> exHandler) {
     this(factory, nsPort, nameServerAddr, nameServerPort, retryCount, retryTimeout, codec, tpFactory, recvHandler, exHandler,
         LocalAddressProviderFactory.getInstance());
   }
@@ -161,7 +160,7 @@ public final class NetworkService<T> implements Stage, ConnectionFactory<T> {
 
     this.factory = factory;
     this.codec = codec;
-    this.transport = tpFactory.create(nsPort,
+    this.transport = tpFactory.newInstance(nsPort,
         new LoggingEventHandler<TransportEvent>(),
         new MessageHandler<T>(recvHandler, codec, factory), exHandler);
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
index a31c5de..dd5e8a1 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
@@ -18,7 +18,8 @@
  */
 package org.apache.reef.io.network.impl;
 
-import org.apache.reef.io.network.TransportFactory;
+import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory;
+import org.apache.reef.wake.remote.transport.TransportFactory;
 import org.apache.reef.io.network.util.StringIdentifierFactory;
 import org.apache.reef.tang.annotations.Name;
 import org.apache.reef.tang.annotations.NamedParameter;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
index 52688aa..3efef65 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
@@ -19,11 +19,11 @@
 package org.apache.reef.io.network.naming;
 
 import org.apache.reef.io.naming.Naming;
-import org.apache.reef.util.cache.Cache;
 import org.apache.reef.io.network.naming.exception.NamingRuntimeException;
 import org.apache.reef.io.network.naming.serialization.NamingLookupResponse;
 import org.apache.reef.io.network.naming.serialization.NamingMessage;
 import org.apache.reef.io.network.naming.serialization.NamingRegisterResponse;
+import org.apache.reef.util.cache.Cache;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.Identifier;
 import org.apache.reef.wake.IdentifierFactory;
@@ -34,7 +34,8 @@ import org.apache.reef.wake.remote.address.LocalAddressProvider;
 import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
 import org.apache.reef.wake.remote.impl.TransportEvent;
 import org.apache.reef.wake.remote.transport.Transport;
-import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory;
+import org.apache.reef.wake.remote.transport.TransportFactory;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -117,12 +118,35 @@ public class NameClient implements Stage, Naming {
                     final int retryTimeout,
                     final Cache<Identifier, InetSocketAddress> cache,
                     final LocalAddressProvider localAddressProvider) {
+     this(serverAddr, serverPort, timeout, factory, retryCount, retryTimeout,
+         cache, localAddressProvider, new MessagingTransportFactory());
+  }
+
+  /**
+   * Constructs a naming client
+   *
+   * @param serverAddr a server address
+   * @param serverPort a server port number
+   * @param timeout    timeout in ms
+   * @param factory    an identifier factory
+   * @param cache      a cache
+   * @param tpFactory  transport factory
+   */
+  public NameClient(final String serverAddr,
+                    final int serverPort,
+                    final long timeout,
+                    final IdentifierFactory factory,
+                    final int retryCount,
+                    final int retryTimeout,
+                    final Cache<Identifier, InetSocketAddress> cache,
+                    final LocalAddressProvider localAddressProvider,
+                    final TransportFactory tpFactory) {
 
     final BlockingQueue<NamingLookupResponse> replyLookupQueue = new LinkedBlockingQueue<NamingLookupResponse>();
     final BlockingQueue<NamingRegisterResponse> replyRegisterQueue = new LinkedBlockingQueue<NamingRegisterResponse>();
     final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
 
-    this.transport = new NettyMessagingTransport(localAddressProvider.getLocalAddress(), 0,
+    this.transport = tpFactory.newInstance(localAddressProvider.getLocalAddress(), 0,
         new SyncStage<>(new NamingClientEventHandler(
             new NamingResponseHandler(replyLookupQueue, replyRegisterQueue), codec)),
         null, retryCount, retryTimeout);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
index a402ebb..fdef6f0 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
@@ -20,13 +20,13 @@ package org.apache.reef.io.network.naming;
 
 import org.apache.reef.io.naming.NameAssignment;
 import org.apache.reef.io.naming.NamingLookup;
-import org.apache.reef.util.cache.Cache;
 import org.apache.reef.io.network.naming.exception.NamingException;
 import org.apache.reef.io.network.naming.serialization.NamingLookupRequest;
 import org.apache.reef.io.network.naming.serialization.NamingLookupResponse;
 import org.apache.reef.io.network.naming.serialization.NamingMessage;
 import org.apache.reef.tang.annotations.Name;
 import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.util.cache.Cache;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.Identifier;
 import org.apache.reef.wake.IdentifierFactory;
@@ -39,7 +39,8 @@ import org.apache.reef.wake.remote.impl.TransportEvent;
 import org.apache.reef.wake.remote.transport.Link;
 import org.apache.reef.wake.remote.transport.Transport;
 import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener;
-import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory;
+import org.apache.reef.wake.remote.transport.TransportFactory;
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
@@ -93,7 +94,6 @@ public class NameLookupClient implements Stage, NamingLookup {
    * @param factory    an identifier factory
    * @param cache      an cache
    */
-  @Deprecated
   public NameLookupClient(final String serverAddr,
                           final int serverPort,
                           final IdentifierFactory factory,
@@ -139,6 +139,29 @@ public class NameLookupClient implements Stage, NamingLookup {
                           final int retryTimeout,
                           final Cache<Identifier, InetSocketAddress> cache,
                           final LocalAddressProvider localAddressProvider) {
+    this(serverAddr, serverPort, timeout, factory, retryCount, retryTimeout,
+        cache, localAddressProvider, new MessagingTransportFactory());
+  }
+
+  /**
+   * Constructs a naming lookup client
+   *
+   * @param serverAddr a server address
+   * @param serverPort a server port number
+   * @param timeout    request timeout in ms
+   * @param factory    an identifier factory
+   * @param cache      an cache
+   * @param tpFactory  a transport factory
+   */
+  public NameLookupClient(final String serverAddr,
+                          final int serverPort,
+                          final long timeout,
+                          final IdentifierFactory factory,
+                          final int retryCount,
+                          final int retryTimeout,
+                          final Cache<Identifier, InetSocketAddress> cache,
+                          final LocalAddressProvider localAddressProvider,
+                          final TransportFactory tpFactory) {
 
     this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort);
     this.timeout = timeout;
@@ -146,7 +169,7 @@ public class NameLookupClient implements Stage, NamingLookup {
     this.codec = NamingCodecFactory.createLookupCodec(factory);
     this.replyQueue = new LinkedBlockingQueue<>();
 
-    this.transport = new NettyMessagingTransport(localAddressProvider.getLocalAddress(), 0,
+    this.transport = tpFactory.newInstance(localAddressProvider.getLocalAddress(), 0,
         new SyncStage<>(new NamingLookupClientHandler(
             new NamingLookupResponseHandler(this.replyQueue), this.codec)),
         null, retryCount, retryTimeout);
@@ -154,7 +177,6 @@ public class NameLookupClient implements Stage, NamingLookup {
     this.retryCount = retryCount;
     this.retryTimeout = retryTimeout;
   }
-
   NameLookupClient(final String serverAddr, final int serverPort, final long timeout,
                    final IdentifierFactory factory, final int retryCount, final int retryTimeout,
                    final BlockingQueue<NamingLookupResponse> replyQueue, final Transport transport,
@@ -170,6 +192,8 @@ public class NameLookupClient implements Stage, NamingLookup {
     this.retryTimeout = retryTimeout;
   }
 
+
+
   /**
    * Finds an address for an identifier
    *

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java
index ba48ed7..24056dd 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java
@@ -24,12 +24,16 @@ import org.apache.reef.io.network.naming.serialization.NamingMessage;
 import org.apache.reef.io.network.naming.serialization.NamingRegisterRequest;
 import org.apache.reef.io.network.naming.serialization.NamingRegisterResponse;
 import org.apache.reef.io.network.naming.serialization.NamingUnregisterRequest;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.Identifier;
 import org.apache.reef.wake.IdentifierFactory;
 import org.apache.reef.wake.Stage;
 import org.apache.reef.wake.impl.SyncStage;
 import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.RemoteConfiguration;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
 import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
 import org.apache.reef.wake.remote.impl.TransportEvent;
@@ -90,9 +94,17 @@ public class NameRegistryClient implements Stage, NamingRegistry {
     this.timeout = timeout;
     this.codec = NamingCodecFactory.createRegistryCodec(factory);
     this.replyQueue = new LinkedBlockingQueue<>();
-    this.transport = new NettyMessagingTransport(localAddressProvider.getLocalAddress(), 0,
-        new SyncStage<>(new NamingRegistryClientHandler(new NamingRegistryResponseHandler(replyQueue), codec)),
-        null, 3, 10000);
+
+    Injector injector = Tang.Factory.getTang().newInjector();
+    injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, localAddressProvider.getLocalAddress());
+    injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class,
+        new SyncStage<>(new NamingRegistryClientHandler(new NamingRegistryResponseHandler(replyQueue), codec)));
+
+    try {
+      this.transport = injector.getInstance(NettyMessagingTransport.class);
+    } catch (InjectionException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Deprecated

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
index c541c73..77c012a 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
@@ -20,19 +20,23 @@ package org.apache.reef.io.network.naming;
 
 import org.apache.reef.io.naming.NameAssignment;
 import org.apache.reef.io.network.naming.serialization.*;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.Identifier;
 import org.apache.reef.wake.IdentifierFactory;
 import org.apache.reef.wake.impl.MultiEventHandler;
 import org.apache.reef.wake.impl.SyncStage;
 import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.RemoteConfiguration;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
 import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
 import org.apache.reef.wake.remote.impl.TransportEvent;
-import org.apache.reef.wake.remote.ports.RangeTcpPortProvider;
-import org.apache.reef.wake.remote.ports.TcpPortProvider;
 import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.TransportFactory;
+import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory;
 import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
 import org.apache.reef.webserver.AvroReefServiceInfo;
 import org.apache.reef.webserver.ReefEventStateManager;
@@ -69,13 +73,22 @@ public class NameServerImpl implements NameServer {
       final IdentifierFactory factory,
       final LocalAddressProvider localAddressProvider) {
 
+    Injector injector = Tang.Factory.getTang().newInjector();
+
     this.localAddressProvider = localAddressProvider;
     this.reefEventStateManager = null;
     final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
     final EventHandler<NamingMessage> handler = createEventHandler(codec);
 
-    this.transport = new NettyMessagingTransport(localAddressProvider.getLocalAddress(), port, null,
-        new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000);
+    injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, localAddressProvider.getLocalAddress());
+    injector.bindVolatileParameter(RemoteConfiguration.Port.class, port);
+    injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class, new SyncStage<>(new NamingServerHandler(handler, codec)));
+
+    try {
+      this.transport = injector.getInstance(NettyMessagingTransport.class);
+    } catch (InjectionException e) {
+      throw new RuntimeException(e);
+    }
 
     this.port = transport.getListeningPort();
     this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>());
@@ -100,15 +113,36 @@ public class NameServerImpl implements NameServer {
       final int port,
       final IdentifierFactory factory,
       final ReefEventStateManager reefEventStateManager) {
-    this(port, factory, reefEventStateManager, LocalAddressProviderFactory.getInstance(), RangeTcpPortProvider.Default);
+    this(port, factory, reefEventStateManager, LocalAddressProviderFactory.getInstance());
+  }
+
+  /**
+   * Constructs a name server
+   *
+   * @param port                  a listening port number
+   * @param factory               an identifier factory
+   * @param reefEventStateManager the event state manager used to register name server info
+   * @param localAddressProvider  a local address provider
+   * @deprecated have an instance injected instead
+   */
+  @Deprecated
+  public NameServerImpl(
+      final int port,
+      final IdentifierFactory factory,
+      final ReefEventStateManager reefEventStateManager,
+      final LocalAddressProvider localAddressProvider) {
+    this(port, factory, reefEventStateManager, localAddressProvider, new MessagingTransportFactory());
   }
 
+
   /**
    * Constructs a name server
    *
    * @param port                  a listening port number
    * @param factory               an identifier factory
    * @param reefEventStateManager the event state manager used to register name server info
+   * @param localAddressProvider  a local address provider
+   * @param tpFactory             a transport factory
    */
   @Inject
   public NameServerImpl(
@@ -116,16 +150,14 @@ public class NameServerImpl implements NameServer {
       final @Parameter(NameServerParameters.NameServerIdentifierFactory.class) IdentifierFactory factory,
       final ReefEventStateManager reefEventStateManager,
       final LocalAddressProvider localAddressProvider,
-      final TcpPortProvider tcpPortProvider) {
-
+      final TransportFactory tpFactory) {
     this.localAddressProvider = localAddressProvider;
-
     this.reefEventStateManager = reefEventStateManager;
     final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
     final EventHandler<NamingMessage> handler = createEventHandler(codec);
 
-    this.transport = new NettyMessagingTransport(localAddressProvider.getLocalAddress(), port, null,
-        new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000, tcpPortProvider);
+    this.transport = tpFactory.newInstance(localAddressProvider.getLocalAddress(), port, null,
+        new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000);
 
     this.port = transport.getListeningPort();
     this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>());
@@ -138,6 +170,7 @@ public class NameServerImpl implements NameServer {
     LOG.log(Level.FINE, "NameServer starting, listening at port {0}", this.port);
   }
 
+
   private EventHandler<NamingMessage> createEventHandler(final Codec<NamingMessage> codec) {
 
     final Map<Class<? extends NamingMessage>, EventHandler<? extends NamingMessage>>
@@ -150,7 +183,6 @@ public class NameServerImpl implements NameServer {
 
     return handler;
   }
-
   /**
    * Gets port
    */

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
index f68ef5a..9976026 100644
--- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
@@ -21,14 +21,13 @@ package org.apache.reef.services.network;
 import org.apache.reef.exception.evaluator.NetworkException;
 import org.apache.reef.io.network.Connection;
 import org.apache.reef.io.network.Message;
-import org.apache.reef.io.network.impl.MessagingTransportFactory;
+import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory;
 import org.apache.reef.io.network.impl.NetworkService;
 import org.apache.reef.io.network.naming.NameServer;
 import org.apache.reef.io.network.naming.NameServerImpl;
 import org.apache.reef.io.network.util.StringIdentifierFactory;
 import org.apache.reef.services.network.util.Monitor;
 import org.apache.reef.services.network.util.StringCodec;
-import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.Identifier;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteConfiguration.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteConfiguration.java
index f438239..0f433d4 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteConfiguration.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteConfiguration.java
@@ -20,8 +20,11 @@ package org.apache.reef.wake.remote;
 
 import org.apache.reef.tang.annotations.Name;
 import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.EStage;
 import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.DefaultTransportEStage;
 import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.remote.impl.TransportEvent;
 
 /**
  * Configuration options and helper methods for Wake remoting.
@@ -68,4 +71,14 @@ public final class RemoteConfiguration {
   public static final class RetryTimeout implements Name<Integer> {
     // Intentionally empty       
   }
+
+  @NamedParameter(doc = "Client stage for messaging transport", default_class = DefaultTransportEStage.class)
+  public static final class RemoteClientStage implements Name<EStage<TransportEvent>> {
+    // Intentionally empty
+  }
+
+  @NamedParameter(doc = "Server stage for messaging transport", default_class = DefaultTransportEStage.class)
+  public static final class RemoteServerStage implements Name<EStage<TransportEvent>> {
+    // Intentionally empty
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java
index e36b4dc..bb11751 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java
@@ -26,6 +26,7 @@ import org.apache.reef.wake.remote.RemoteManager;
 import org.apache.reef.wake.remote.RemoteManagerFactory;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
 import org.apache.reef.wake.remote.ports.TcpPortProvider;
+import org.apache.reef.wake.remote.transport.TransportFactory;
 
 import javax.inject.Inject;
 
@@ -40,7 +41,7 @@ public class DefaultRemoteManagerFactory implements RemoteManagerFactory {
   private final int numberOfTries;
   private final int retryTimeout;
   private final LocalAddressProvider localAddressProvider;
-  private final TcpPortProvider tcpPortProvider;
+  private final TransportFactory tpFactory;
 
   @Inject
   private DefaultRemoteManagerFactory(
@@ -50,14 +51,14 @@ public class DefaultRemoteManagerFactory implements RemoteManagerFactory {
       final @Parameter(RemoteConfiguration.NumberOfTries.class) int numberOfTries,
       final @Parameter(RemoteConfiguration.RetryTimeout.class) int retryTimeout,
       final LocalAddressProvider localAddressProvider,
-      final TcpPortProvider tcpPortProvider) {
+      final TransportFactory tpFactory) {
     this.codec = codec;
     this.errorHandler = errorHandler;
     this.orderingGuarantee = orderingGuarantee;
     this.numberOfTries = numberOfTries;
     this.retryTimeout = retryTimeout;
     this.localAddressProvider = localAddressProvider;
-    this.tcpPortProvider = tcpPortProvider;
+    this.tpFactory = tpFactory;
   }
 
   @Override
@@ -71,7 +72,7 @@ public class DefaultRemoteManagerFactory implements RemoteManagerFactory {
         this.numberOfTries,
         this.retryTimeout,
         this.localAddressProvider,
-        this.tcpPortProvider);
+        this.tpFactory);
   }
 
 
@@ -95,7 +96,7 @@ public class DefaultRemoteManagerFactory implements RemoteManagerFactory {
         numberOfTries,
         retryTimeout,
         localAddressProvider,
-        tcpPortProvider);
+        tpFactory);
   }
 
   @Override
@@ -116,7 +117,7 @@ public class DefaultRemoteManagerFactory implements RemoteManagerFactory {
         numberOfTries,
         retryTimeout,
         this.localAddressProvider,
-        this.tcpPortProvider);
+        this.tpFactory);
 
   }
 
@@ -131,7 +132,7 @@ public class DefaultRemoteManagerFactory implements RemoteManagerFactory {
         this.numberOfTries,
         this.retryTimeout,
         this.localAddressProvider,
-        this.tcpPortProvider);
+        this.tpFactory);
   }
 
   @Override
@@ -148,6 +149,6 @@ public class DefaultRemoteManagerFactory implements RemoteManagerFactory {
         this.numberOfTries,
         this.retryTimeout,
         this.localAddressProvider,
-        this.tcpPortProvider);
+        this.tpFactory);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
index c53d063..7fbe59e 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
@@ -25,9 +25,9 @@ import org.apache.reef.wake.impl.StageManager;
 import org.apache.reef.wake.remote.*;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
 import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
-import org.apache.reef.wake.remote.ports.RangeTcpPortProvider;
-import org.apache.reef.wake.remote.ports.TcpPortProvider;
 import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.TransportFactory;
+import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory;
 import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
 
 import javax.inject.Inject;
@@ -64,8 +64,7 @@ public class DefaultRemoteManagerImplementation implements RemoteManager {
   /**
    * Indicates a hostname that isn't set or known.
    */
-  public static final String UNKNOWN_HOST_NAME = "##UNKNOWN##";
-
+  public static final String UNKNOWN_HOST_NAME = NettyMessagingTransport.UNKNOWN_HOST_NAME;
 
   /**
    * @deprecated have an instance injected instead. Or use RemoteManagerFactory.getInstance()
@@ -89,8 +88,7 @@ public class DefaultRemoteManagerImplementation implements RemoteManager {
         numberOfTries,
         retryTimeout,
         LocalAddressProviderFactory.getInstance(),
-        RangeTcpPortProvider.Default);
-
+        new MessagingTransportFactory());
   }
 
   /**
@@ -108,7 +106,7 @@ public class DefaultRemoteManagerImplementation implements RemoteManager {
       final @Parameter(RemoteConfiguration.NumberOfTries.class) int numberOfTries,
       final @Parameter(RemoteConfiguration.RetryTimeout.class) int retryTimeout,
       final LocalAddressProvider localAddressProvider,
-      final TcpPortProvider tcpPortProvider) {
+      final TransportFactory tpFactory) {
 
     this.name = name;
     this.handlerContainer = new HandlerContainer<>(name, codec);
@@ -117,9 +115,8 @@ public class DefaultRemoteManagerImplementation implements RemoteManager {
         new OrderedRemoteReceiverStage(this.handlerContainer, errorHandler) :
         new RemoteReceiverStage(this.handlerContainer, errorHandler, 10);
 
-    final String host = UNKNOWN_HOST_NAME.equals(hostAddress) ? localAddressProvider.getLocalAddress() : hostAddress;
-    this.transport = new NettyMessagingTransport(
-            host, listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout, tcpPortProvider);
+    this.transport = tpFactory.newInstance(
+        hostAddress, listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout);
 
     this.handlerContainer.setTransport(this.transport);
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultTransportEStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultTransportEStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultTransportEStage.java
new file mode 100644
index 0000000..8ab669e
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultTransportEStage.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.remote.impl;
+
+import org.apache.reef.wake.EStage;
+
+import javax.inject.Inject;
+
+public class DefaultTransportEStage implements EStage<TransportEvent> {
+
+  @Inject
+  public DefaultTransportEStage() {
+  }
+
+  @Override
+  public void onNext(TransportEvent value) {
+  }
+
+  @Override
+  public void close() throws Exception {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/TransportFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/TransportFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/TransportFactory.java
new file mode 100644
index 0000000..a09339b
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/TransportFactory.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.remote.transport;
+
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
+import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory;
+
+/**
+ * Factory that creates a transport
+ */
+@DefaultImplementation(MessagingTransportFactory.class)
+public interface TransportFactory {
+
+  /**
+   * Creates a transport
+   *
+   * @param port          a listening port
+   * @param clientHandler a transport client-side handler
+   * @param serverHandler a transport server-side handler
+   * @param exHandler     an exception handler
+   * @return transport
+   */
+  public Transport newInstance(int port,
+                          EventHandler<TransportEvent> clientHandler,
+                          EventHandler<TransportEvent> serverHandler,
+                          EventHandler<Exception> exHandler);
+
+  /**
+   * Creates a transport
+   *
+   * @param hostAddress     a host address
+   * @param port            a listening port
+   * @param clientStage     a transport client-side stage
+   * @param serverStage     a transport server-side stage
+   * @param numberOfTries   the number of retries for connection
+   * @param retryTimeout    retry timeout
+   * @return transport
+   */
+  public Transport newInstance(final String hostAddress, int port,
+                               final EStage<TransportEvent> clientStage,
+                               final EStage<TransportEvent> serverStage,
+                               final int numberOfTries,
+                               final int retryTimeout);
+
+  /**
+   * Creates a transport
+   *
+   * @param hostAddress     a host address
+   * @param port            a listening port
+   * @param clientStage     a transport client-side stage
+   * @param serverStage     a transport server-side stage
+   * @param numberOfTries   the number of retries for connection
+   * @param retryTimeout    retry timeout
+   * @param tcpPortProvider tcpPortProvider
+   * @return transport
+   */
+  public Transport newInstance(final String hostAddress,
+                               int port,
+                               final EStage<TransportEvent> clientStage,
+                               final EStage<TransportEvent> serverStage,
+                               final int numberOfTries,
+                               final int retryTimeout,
+                               final TcpPortProvider tcpPortProvider);
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java
new file mode 100644
index 0000000..b77f804
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.remote.transport.netty;
+
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.SyncStage;
+import org.apache.reef.wake.remote.RemoteConfiguration;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.ports.RangeTcpPortProvider;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.TransportFactory;
+
+import javax.inject.Inject;
+
+/**
+ * Factory that creates a messaging transport
+ */
+public class MessagingTransportFactory implements TransportFactory {
+
+  private final String localAddress;
+
+  /**
+   * @deprecated Have an instance injected instead.
+   */
+  @Deprecated
+  @Inject
+  public MessagingTransportFactory(final LocalAddressProvider localAddressProvider) {
+    this.localAddress = localAddressProvider.getLocalAddress();
+  }
+
+  /**
+   * @deprecated Have an instance injected instead.
+   */
+  @Deprecated
+  public MessagingTransportFactory() {
+    this.localAddress = LocalAddressProviderFactory.getInstance().getLocalAddress();
+  }
+
+  /**
+   * Creates a transport
+   *
+   * @param port          a listening port
+   * @param clientHandler a transport client side handler
+   * @param serverHandler a transport server side handler
+   * @param exHandler     a exception handler
+   */
+  @Override
+  public Transport newInstance(final int port,
+                               final EventHandler<TransportEvent> clientHandler,
+                               final EventHandler<TransportEvent> serverHandler,
+                               final EventHandler<Exception> exHandler) {
+
+    Injector injector = Tang.Factory.getTang().newInjector();
+    injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, this.localAddress);
+    injector.bindVolatileParameter(RemoteConfiguration.Port.class, port);
+    injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class, new SyncStage<>(clientHandler));
+    injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class, new SyncStage<>(serverHandler));
+
+    final Transport transport;
+    try {
+      transport = injector.getInstance(NettyMessagingTransport.class);
+      transport.registerErrorHandler(exHandler);
+      return transport;
+    } catch (InjectionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Transport newInstance(final String hostAddress, int port,
+                               final EStage<TransportEvent> clientStage,
+                               final EStage<TransportEvent> serverStage,
+                               final int numberOfTries,
+                               final int retryTimeout) {
+    return newInstance(hostAddress, port, clientStage,
+        serverStage, numberOfTries, retryTimeout, RangeTcpPortProvider.Default);
+  }
+
+  @Override
+  public Transport newInstance(final String hostAddress, int port,
+                               final EStage<TransportEvent> clientStage,
+                               final EStage<TransportEvent> serverStage,
+                               final int numberOfTries,
+                               final int retryTimeout,
+                               final TcpPortProvider tcpPortProvider) {
+
+    Injector injector = Tang.Factory.getTang().newInjector();
+    injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, hostAddress);
+    injector.bindVolatileParameter(RemoteConfiguration.Port.class, port);
+    injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class, clientStage);
+    injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class, serverStage);
+    injector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, numberOfTries);
+    injector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, retryTimeout);
+    injector.bindVolatileInstance(TcpPortProvider.class, tcpPortProvider);
+    try {
+      return injector.getInstance(NettyMessagingTransport.class);
+    } catch (InjectionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
index a1f0886..dff38bd 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
@@ -30,10 +30,14 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.util.concurrent.GlobalEventExecutor;
+import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.wake.EStage;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.impl.DefaultThreadFactory;
 import org.apache.reef.wake.remote.Encoder;
+import org.apache.reef.wake.remote.RemoteConfiguration;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
 import org.apache.reef.wake.remote.exception.RemoteRuntimeException;
 import org.apache.reef.wake.remote.impl.TransportEvent;
 import org.apache.reef.wake.remote.ports.RangeTcpPortProvider;
@@ -43,6 +47,7 @@ import org.apache.reef.wake.remote.transport.LinkListener;
 import org.apache.reef.wake.remote.transport.Transport;
 import org.apache.reef.wake.remote.transport.exception.TransportRuntimeException;
 
+import javax.inject.Inject;
 import java.io.IOException;
 import java.net.BindException;
 import java.net.ConnectException;
@@ -88,6 +93,11 @@ public class NettyMessagingTransport implements Transport {
 
   private final int numberOfTries;
   private final int retryTimeout;
+  /**
+   * Indicates a hostname that isn't set or known.
+   */
+  public static final String UNKNOWN_HOST_NAME = "##UNKNOWN##";
+
 
   /**
    * Constructs a messaging transport
@@ -99,19 +109,49 @@ public class NettyMessagingTransport implements Transport {
    * @param numberOfTries the number of tries of connection
    * @param retryTimeout  the timeout of reconnection
    * @param tcpPortProvider  gives an iterator that produces random tcp ports in a range
+   * @deprecated use the constructor that takes a LocalAddressProvider instead.
+   */
+  @Deprecated
+  public NettyMessagingTransport(
+      final String hostAddress,
+      int port,
+      final EStage<TransportEvent> clientStage,
+      final EStage<TransportEvent> serverStage,
+      final int numberOfTries,
+      final int retryTimeout,
+      final TcpPortProvider tcpPortProvider) {
+
+    this(hostAddress, port, clientStage, serverStage, numberOfTries,
+        retryTimeout, tcpPortProvider, LocalAddressProviderFactory.getInstance());
+  }
+  /**
+   * Constructs a messaging transport
    *
+   * @param hostAddress   the server host address
+   * @param port          the server listening port; when it is 0, randomly assign a port number
+   * @param clientStage   the client-side stage that handles transport events
+   * @param serverStage   the server-side stage that handles transport events
+   * @param numberOfTries the number of tries of connection
+   * @param retryTimeout  the timeout of reconnection
+   * @param tcpPortProvider  gives an iterator that produces random tcp ports in a range
    */
-  public NettyMessagingTransport(final String hostAddress, int port,
-                                 final EStage<TransportEvent> clientStage,
-                                 final EStage<TransportEvent> serverStage,
-                                 final int numberOfTries,
-                                 final int retryTimeout,
-                                 final TcpPortProvider tcpPortProvider) {
+  @Inject
+  NettyMessagingTransport(
+      final @Parameter(RemoteConfiguration.HostAddress.class) String hostAddress,
+      @Parameter(RemoteConfiguration.Port.class) int port,
+      final @Parameter(RemoteConfiguration.RemoteClientStage.class) EStage<TransportEvent> clientStage,
+      final @Parameter(RemoteConfiguration.RemoteServerStage.class) EStage<TransportEvent> serverStage,
+      final @Parameter(RemoteConfiguration.NumberOfTries.class) int numberOfTries,
+      final @Parameter(RemoteConfiguration.RetryTimeout.class) int retryTimeout,
+      final TcpPortProvider tcpPortProvider,
+      final LocalAddressProvider localAddressProvider) {
 
     if (port < 0) {
       throw new RemoteRuntimeException("Invalid server port: " + port);
     }
 
+    final String host = UNKNOWN_HOST_NAME.equals(hostAddress) ? localAddressProvider.getLocalAddress() : hostAddress;
+
     this.numberOfTries = numberOfTries;
     this.retryTimeout = retryTimeout;
     this.clientEventListener = new NettyClientEventListener(this.addrToLinkRefMap, clientStage);
@@ -143,7 +183,7 @@ public class NettyMessagingTransport implements Transport {
   Channel acceptor = null;
   try {
     if (port > 0) {
-      acceptor = this.serverBootstrap.bind(new InetSocketAddress(hostAddress, port)).sync().channel();
+      acceptor = this.serverBootstrap.bind(new InetSocketAddress(host, port)).sync().channel();
     } else {
       Iterator<Integer> ports = tcpPortProvider.iterator();
       while (acceptor == null) {
@@ -151,7 +191,7 @@ public class NettyMessagingTransport implements Transport {
         port = ports.next();
         LOG.log(Level.FINEST, "Try port {0}", port);
         try {
-          acceptor = this.serverBootstrap.bind(new InetSocketAddress(hostAddress, port)).sync().channel();
+          acceptor = this.serverBootstrap.bind(new InetSocketAddress(host, port)).sync().channel();
         } catch (final Exception ex) {
           if (ex instanceof BindException) {
             LOG.log(Level.FINEST, "The port {0} is already bound. Try again", port);
@@ -174,7 +214,7 @@ public class NettyMessagingTransport implements Transport {
 
     this.acceptor = acceptor;
     this.serverPort = port;
-    this.localAddress = new InetSocketAddress(hostAddress, this.serverPort);
+    this.localAddress = new InetSocketAddress(host, this.serverPort);
 
     LOG.log(Level.FINE, "Starting netty transport socket address: {0}", this.localAddress);
   }
@@ -188,7 +228,7 @@ public class NettyMessagingTransport implements Transport {
    * @param serverStage   the server-side stage that handles transport events
    * @param numberOfTries the number of tries of connection
    * @param retryTimeout  the timeout of reconnection
-   * @deprecated use the constructor that takes a TcpProvider instead
+   * @deprecated use the constructor that takes a TcpProvider and LocalAddressProvider instead.
    */
   @Deprecated
   public NettyMessagingTransport(final String hostAddress, int port,

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java
index 3defe01..c0c6a01 100644
--- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java
+++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.reef.wake.test.remote;
 
+import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.wake.EStage;
@@ -27,10 +28,10 @@ import org.apache.reef.wake.impl.LoggingUtils;
 import org.apache.reef.wake.impl.ThreadPoolStage;
 import org.apache.reef.wake.impl.TimerStage;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
-import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
 import org.apache.reef.wake.remote.impl.TransportEvent;
 import org.apache.reef.wake.remote.transport.Link;
-import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.TransportFactory;
 import org.apache.reef.wake.test.util.Monitor;
 import org.apache.reef.wake.test.util.PassThroughEncoder;
 import org.apache.reef.wake.test.util.TimeoutHandler;
@@ -46,13 +47,16 @@ import java.util.logging.Level;
  */
 public class LargeMsgTest {
   private final LocalAddressProvider localAddressProvider;
+  private final TransportFactory tpFactory;
   private final static byte[][] values = new byte[3][];
   private final static int l0 = 1 << 25;
   private final static int l1 = 1 << 2;
   private final static int l2 = 1 << 21;
 
   public LargeMsgTest() throws InjectionException {
-    this.localAddressProvider = LocalAddressProviderFactory.getInstance();
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    this.localAddressProvider = injector.getInstance(LocalAddressProvider.class);
+    this.tpFactory = injector.getInstance(TransportFactory.class);
   }
 
   @BeforeClass
@@ -87,7 +91,7 @@ public class LargeMsgTest {
 
     final String hostAddress = this.localAddressProvider.getLocalAddress();
     int port = 7001;
-    NettyMessagingTransport transport = new NettyMessagingTransport(hostAddress, port, clientStage, serverStage, 1, 10000);
+    Transport transport = tpFactory.newInstance(hostAddress, port, clientStage, serverStage, 1, 10000);
     final Link<byte[]> link = transport.open(new InetSocketAddress(hostAddress, port), new PassThroughEncoder(), null);
     EStage<byte[]> writeSubmitter = new ThreadPoolStage<>("Submitter", new EventHandler<byte[]>() {
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteTest.java
index 636315f..35f879a 100644
--- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteTest.java
+++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteTest.java
@@ -18,17 +18,22 @@
  */
 package org.apache.reef.wake.test.remote;
 
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.impl.LoggingEventHandler;
 import org.apache.reef.wake.impl.LoggingUtils;
 import org.apache.reef.wake.impl.MultiEventHandler;
 import org.apache.reef.wake.impl.TimerStage;
-import org.apache.reef.wake.remote.*;
+import org.apache.reef.wake.remote.Decoder;
+import org.apache.reef.wake.remote.Encoder;
+import org.apache.reef.wake.remote.RemoteIdentifier;
+import org.apache.reef.wake.remote.RemoteIdentifierFactory;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
-import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
 import org.apache.reef.wake.remote.impl.*;
 import org.apache.reef.wake.remote.transport.Transport;
-import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+import org.apache.reef.wake.remote.transport.TransportFactory;
 import org.apache.reef.wake.test.util.Monitor;
 import org.apache.reef.wake.test.util.TimeoutHandler;
 import org.junit.Assert;
@@ -44,14 +49,17 @@ import java.util.logging.Level;
 
 public class RemoteTest {
   private final LocalAddressProvider localAddressProvider;
+  private final TransportFactory tpFactory;
   @Rule
   public final TestName name = new TestName();
 
   final String logPrefix = "TEST ";
 
 
-  public RemoteTest() {
-    this.localAddressProvider = LocalAddressProviderFactory.getInstance();
+  public RemoteTest() throws InjectionException {
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    this.localAddressProvider = injector.getInstance(LocalAddressProvider.class);
+    this.tpFactory = injector.getInstance(TransportFactory.class);
   }
 
   @Test
@@ -87,10 +95,10 @@ public class RemoteTest {
     final String hostAddress = this.localAddressProvider.getLocalAddress();
 
     // transport
-    Transport transport1 = new NettyMessagingTransport(hostAddress, 0, reRecvStage, reRecvStage, 1, 10000);
+    Transport transport1 = tpFactory.newInstance(hostAddress, 0, reRecvStage, reRecvStage, 1, 10000);
     int port1 = transport1.getListeningPort();
 
-    Transport transport2 = new NettyMessagingTransport(hostAddress, 0, reRecvStage, reRecvStage, 1, 10000);
+    Transport transport2 = tpFactory.newInstance(hostAddress, 0, reRecvStage, reRecvStage, 1, 10000);
     int port2 = transport2.getListeningPort();
 
     transport1.close();
@@ -132,7 +140,7 @@ public class RemoteTest {
     final String hostAddress = this.localAddressProvider.getLocalAddress();
 
     // transport
-    Transport transport = new NettyMessagingTransport(hostAddress, port, reRecvStage, reRecvStage, 1, 10000);
+    Transport transport = tpFactory.newInstance(hostAddress, port, reRecvStage, reRecvStage, 1, 10000);
 
     // mux encoder with encoder map
     Map<Class<?>, Encoder<?>> clazzToEncoderMap = new HashMap<Class<?>, Encoder<?>>();

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/SmallMessagesTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/SmallMessagesTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/SmallMessagesTest.java
index fdced04..6e93d67 100644
--- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/SmallMessagesTest.java
+++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/SmallMessagesTest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.reef.wake.test.remote;
 
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.impl.LoggingUtils;
 import org.apache.reef.wake.impl.MultiEventHandler;
@@ -27,10 +30,9 @@ import org.apache.reef.wake.remote.Encoder;
 import org.apache.reef.wake.remote.RemoteIdentifier;
 import org.apache.reef.wake.remote.RemoteIdentifierFactory;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
-import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
 import org.apache.reef.wake.remote.impl.*;
 import org.apache.reef.wake.remote.transport.Transport;
-import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+import org.apache.reef.wake.remote.transport.TransportFactory;
 import org.apache.reef.wake.test.util.Monitor;
 import org.apache.reef.wake.test.util.TimeoutHandler;
 import org.junit.Assert;
@@ -43,9 +45,12 @@ import java.util.logging.Level;
 
 public class SmallMessagesTest {
   private final LocalAddressProvider localAddressProvider;
+  private final TransportFactory tpFactory;
 
-  public SmallMessagesTest() {
-    this.localAddressProvider = LocalAddressProviderFactory.getInstance();
+  public SmallMessagesTest() throws InjectionException {
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    this.localAddressProvider = injector.getInstance(LocalAddressProvider.class);
+    this.tpFactory = injector.getInstance(TransportFactory.class);
   }
 
   @Rule
@@ -87,7 +92,7 @@ public class SmallMessagesTest {
     final String hostAddress = this.localAddressProvider.getLocalAddress();
 
     // transport
-    Transport transport = new NettyMessagingTransport(hostAddress, port, reRecvStage, reRecvStage, 1, 10000);
+    Transport transport = tpFactory.newInstance(hostAddress, port, reRecvStage, reRecvStage, 1, 10000);
 
     // mux encoder with encoder map
     Map<Class<?>, Encoder<?>> clazzToEncoderMap = new HashMap<Class<?>, Encoder<?>>();

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportRaceTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportRaceTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportRaceTest.java
index bdb480d..b8fd236 100644
--- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportRaceTest.java
+++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportRaceTest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.reef.wake.test.remote;
 
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.wake.EStage;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.impl.LoggingEventHandler;
@@ -25,10 +28,10 @@ import org.apache.reef.wake.impl.LoggingUtils;
 import org.apache.reef.wake.impl.ThreadPoolStage;
 import org.apache.reef.wake.impl.TimerStage;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
-import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
 import org.apache.reef.wake.remote.impl.TransportEvent;
 import org.apache.reef.wake.remote.transport.Link;
-import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.TransportFactory;
 import org.apache.reef.wake.test.util.Monitor;
 import org.apache.reef.wake.test.util.PassThroughEncoder;
 import org.apache.reef.wake.test.util.TimeoutHandler;
@@ -41,9 +44,12 @@ import java.util.logging.Level;
 
 public class TransportRaceTest {
   private final LocalAddressProvider localAddressProvider;
+  private final TransportFactory tpFactory;
 
-  public TransportRaceTest() {
-    this.localAddressProvider = LocalAddressProviderFactory.getInstance();
+  public TransportRaceTest() throws InjectionException {
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    this.localAddressProvider = injector.getInstance(LocalAddressProvider.class);
+    this.tpFactory = injector.getInstance(TransportFactory.class);
   }
 
   @Test
@@ -60,7 +66,7 @@ public class TransportRaceTest {
         serverHandler, 1, new LoggingEventHandler<Throwable>());
     final String hostAddress = this.localAddressProvider.getLocalAddress();
     int port = 7001;
-    NettyMessagingTransport transport = new NettyMessagingTransport(
+    Transport transport = tpFactory.newInstance(
         hostAddress, port, clientStage, serverStage, 1, 10000);
 
     String value = "Test Race";

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportTest.java
index 8176a70..3ad767a 100644
--- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportTest.java
+++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportTest.java
@@ -18,18 +18,20 @@
  */
 package org.apache.reef.wake.test.remote;
 
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.wake.EStage;
 import org.apache.reef.wake.impl.LoggingUtils;
 import org.apache.reef.wake.impl.TimerStage;
 import org.apache.reef.wake.remote.Codec;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
-import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
 import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
 import org.apache.reef.wake.remote.impl.TransportEvent;
 import org.apache.reef.wake.remote.transport.Link;
 import org.apache.reef.wake.remote.transport.Transport;
 import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener;
-import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+import org.apache.reef.wake.remote.transport.TransportFactory;
 import org.apache.reef.wake.test.util.Monitor;
 import org.apache.reef.wake.test.util.TimeoutHandler;
 import org.junit.Assert;
@@ -44,9 +46,12 @@ import java.util.logging.Level;
 
 public class TransportTest {
   private final LocalAddressProvider localAddressProvider;
+  private final TransportFactory tpFactory;
 
-  public TransportTest() {
-    this.localAddressProvider = LocalAddressProviderFactory.getInstance();
+  public TransportTest() throws InjectionException {
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    this.localAddressProvider = injector.getInstance(LocalAddressProvider.class);
+    this.tpFactory = injector.getInstance(TransportFactory.class);
   }
 
   final String logPrefix = "TEST ";
@@ -68,7 +73,7 @@ public class TransportTest {
 
     // Codec<String>
     ReceiverStage<String> stage = new ReceiverStage<String>(new ObjectSerializableCodec<String>(), monitor, expected);
-    Transport transport = new NettyMessagingTransport(hostAddress, port, stage, stage, 1, 10000);
+    Transport transport = tpFactory.newInstance(hostAddress, port, stage, stage, 1, 10000);
 
     // sending side
     Link<String> link = transport.open(
@@ -99,7 +104,7 @@ public class TransportTest {
 
     // Codec<TestEvent>
     ReceiverStage<TestEvent> stage = new ReceiverStage<TestEvent>(new ObjectSerializableCodec<TestEvent>(), monitor, expected);
-    Transport transport = new NettyMessagingTransport(hostAddress, port, stage, stage, 1, 10000);
+    Transport transport = tpFactory.newInstance(hostAddress, port, stage, stage, 1, 10000);
 
     // sending side
     Link<TestEvent> link = transport.open(