You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by bg...@apache.org on 2015/07/10 18:38:38 UTC

incubator-reef git commit: [REEF-184] Allow NetworkService to deal with multiple message handlers This adds the NetworkConnectionService interface and its implementation. The interface provides a way to register multiple ConnectionFactory's, each

Repository: incubator-reef
Updated Branches:
  refs/heads/master 84ff5021c -> 41ff14187


[REEF-184] Allow NetworkService to deal with multiple message handlers
  This adds the NetworkConnectionService interface and its implementation.
  The interface provides a way to register multiple  ConnectionFactory's,
  each of which is used to create connections for a message type.

JIRA:
  [REEF-184] http://issues.apache.org/jira/browse/REEF-184

Pull Request:
  Closes #255

Author:
  Taegeon Um taegeonum@gmail.com
  GeonWoo Kim gwsshs22@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/41ff1418
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/41ff1418
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/41ff1418

Branch: refs/heads/master
Commit: 41ff1418781f885e4847e1c6386c40c9eeb30db8
Parents: 84ff502
Author: taegeonum <ta...@gmail.com>
Authored: Fri Jun 26 20:45:09 2015 +0900
Committer: Byung-Gon Chun <bg...@apache.org>
Committed: Sat Jul 11 01:29:04 2015 +0900

----------------------------------------------------------------------
 .../org/apache/reef/io/network/Connection.java  |  16 +-
 .../io/network/NetworkConnectionService.java    |  98 +++++
 .../BindNetworkConnectionServiceToTask.java     |  51 +++
 .../reef/io/network/impl/NSConnection.java      |  14 +-
 .../reef/io/network/impl/NetworkConnection.java |  94 +++++
 .../network/impl/NetworkConnectionFactory.java  | 103 +++++
 ...etworkConnectionServiceExceptionHandler.java |  42 ++
 .../impl/NetworkConnectionServiceImpl.java      | 239 ++++++++++++
 .../NetworkConnectionServiceLinkListener.java   |  52 +++
 .../impl/NetworkConnectionServiceMessage.java   | 115 ++++++
 .../NetworkConnectionServiceMessageCodec.java   | 146 +++++++
 .../NetworkConnectionServiceReceiveHandler.java |  51 +++
 .../network/impl/NetworkServiceParameters.java  |   2 +-
 .../UnbindNetworkConnectionServiceFromTask.java |  50 +++
 .../NetworkConnectionServiceIdFactory.java      |  29 ++
 .../config/NetworkConnectionServicePort.java    |  26 ++
 .../io/network/impl/config/package-info.java    |  22 ++
 .../network/NetworkConnectionServiceTest.java   | 385 +++++++++++++++++++
 .../services/network/NetworkServiceTest.java    |   8 +-
 .../network/util/NetworkMessagingTest.java      | 139 +++++++
 .../network/util/StreamingIntegerCodec.java     |  57 +++
 .../network/util/StreamingStringCodec.java      |  56 +++
 22 files changed, 1780 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java
index 66f2840..17fc5a1 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java
@@ -20,6 +20,8 @@ package org.apache.reef.io.network;
 
 import org.apache.reef.exception.evaluator.NetworkException;
 
+import java.util.List;
+
 /**
  * Connection between two end-points named by identifiers.
  *
@@ -35,12 +37,18 @@ public interface Connection<T> extends AutoCloseable {
   void open() throws NetworkException;
 
   /**
-   * Writes an object to the connection.
+   * Writes a message to the connection.
    *
-   * @param obj
-   * @throws NetworkException
+   * @param message
+   */
+  void write(T message);
+
+  /**
+   * Writes a list of messages to the connection.
+   *
+   * @param messages
    */
-  void write(T obj) throws NetworkException;
+  void write(List<T> messages);
 
   /**
    * Closes the connection.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java
new file mode 100644
index 0000000..472e619
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.impl.NetworkConnectionServiceImpl;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.transport.LinkListener;
+
+/**
+ * NetworkConnectionService.
+ *
+ * NetworkConnectionService is a service which is designed for communicating messages with each other.
+ * It creates multiple ConnectionFactories, which create multiple connections.
+ *
+ * Flow of message transfer:
+ * [Downstream]: connection.write(message) -> ConnectionFactory
+ * -> src NetworkConnectionService (encode) -> dest NetworkConnectionService.
+ * [Upstream]: message -> dest NetworkConnectionService (decode) -> ConnectionFactory -> EventHandler.
+ *
+ * Users can register a ConnectionFactory by registering their Codec, EventHandler and LinkListener.
+ * When users send messages via connections created by the ConnectionFactory,
+ *
+ * NetworkConnectionService encodes the messages according to the Codec
+ * registered in the ConnectionFactory and sends them to the destination NetworkConnectionService.
+ *
+ * Also, it receives the messages by decoding the messages and forwarding them
+ * to the corresponding EventHandler registered in the ConnectionFactory.
+ */
+@DefaultImplementation(NetworkConnectionServiceImpl.class)
+public interface NetworkConnectionService extends AutoCloseable {
+
+  /**
+   * Registers an instance of ConnectionFactory corresponding to the connectionFactoryId.
+   * Binds Codec, EventHandler and LinkListener to the ConnectionFactory.
+   * ConnectionFactory can create multiple connections between other NetworkConnectionServices.
+   *
+   * @param connectionFactoryId a connection factory id
+   * @param codec a codec for type <T>
+   * @param eventHandler an event handler for type <T>
+   * @param linkListener a link listener
+   * @throws NetworkException throws a NetworkException when multiple connectionFactoryIds exist.
+   */
+  <T> void registerConnectionFactory(final Identifier connectionFactoryId,
+                                     final Codec<T> codec,
+                                     final EventHandler<Message<T>> eventHandler,
+                                     final LinkListener<Message<T>> linkListener) throws NetworkException;
+
+  /**
+   * Unregisters a connectionFactory corresponding to the connectionFactoryId.
+   * @param connectionFactoryId a connection factory id
+   */
+  void unregisterConnectionFactory(final Identifier connectionFactoryId);
+
+  /**
+   * Gets an instance of ConnectionFactory which is registered by the registerConnectionFactory method.
+   * @param connectionFactoryId a connection factory id
+   */
+  <T> ConnectionFactory<T> getConnectionFactory(final Identifier connectionFactoryId);
+
+  /**
+   * Registers a network connection service identifier.
+   * This can be used for destination identifier
+   * @param ncsId network connection service identifier
+   */
+  void registerId(final Identifier ncsId);
+
+  /**
+   * Unregister a network connection service identifier.
+   * @param ncsId network connection service identifier
+   */
+  void unregisterId(final Identifier ncsId);
+
+  /**
+   * Gets a network connection service client id which is equal to the registered id.
+   */
+  Identifier getNetworkConnectionServiceId();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java
new file mode 100644
index 0000000..69aafcf
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.io.network.NetworkConnectionService;
+import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.events.TaskStart;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.IdentifierFactory;
+
+import javax.inject.Inject;
+
+/**
+ * TaskStart event handler for registering NetworkConnectionService.
+ * Users have to bind this handler into ServiceConfiguration.ON_TASK_STARTED.
+ */
+public final class BindNetworkConnectionServiceToTask implements EventHandler<TaskStart> {
+
+  private final NetworkConnectionService ncs;
+  private final IdentifierFactory idFac;
+
+  @Inject
+  public BindNetworkConnectionServiceToTask(
+      final NetworkConnectionService ncs,
+      @Parameter(NetworkConnectionServiceIdFactory.class) final IdentifierFactory idFac) {
+    this.ncs = ncs;
+    this.idFac = idFac;
+  }
+
+  @Override
+  public void onNext(final TaskStart task) {
+    this.ncs.registerId(this.idFac.getNewInstance(task.getId()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java
index b5fa9e3..4bfd0b7 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java
@@ -28,6 +28,7 @@ import org.apache.reef.wake.remote.transport.LinkListener;
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -94,14 +95,19 @@ class NSConnection<T> implements Connection<T> {
   }
 
   /**
-   * Writes an object to the connection.
+   * Writes a message to the connection.
    *
-   * @param obj an object of type T
+   * @param message a message of type T
    * @throws a network exception
    */
   @Override
-  public void write(final T obj) throws NetworkException {
-    this.link.write(new NSMessage<T>(this.srcId, this.destId, obj));
+  public void write(final T message) {
+    this.link.write(new NSMessage<T>(this.srcId, this.destId, message));
+  }
+
+  @Override
+  public void write(List<T> messages) {
+    this.link.write(new NSMessage<T>(this.srcId, this.destId, messages));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnection.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnection.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnection.java
new file mode 100644
index 0000000..d2175e7
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnection.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.Connection;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.remote.transport.Link;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+final class NetworkConnection<T> implements Connection<T> {
+
+  private Link<NetworkConnectionServiceMessage<T>> link;
+
+  private final Identifier destId;
+  private final AtomicBoolean closed;
+  private final NetworkConnectionFactory connFactory;
+
+  /**
+   * Constructs a connection for destination identifier of NetworkConnectionService.
+   * @param connFactory a connection factory of this connection.
+   * @param destId a destination identifier of NetworkConnectionService.
+   */
+  NetworkConnection(
+      final NetworkConnectionFactory connFactory,
+      final Identifier destId) {
+    this.connFactory = connFactory;
+    this.destId = destId;
+    this.closed = new AtomicBoolean();
+  }
+
+  @Override
+  public void open() throws NetworkException {
+    link = connFactory.openLink(destId);
+  }
+
+  @Override
+  public void write(final List<T> messageList) {
+    final NetworkConnectionServiceMessage<T> nsMessage = new NetworkConnectionServiceMessage<>(
+        connFactory.getConnectionFactoryId(),
+        connFactory.getSrcId(),
+        destId,
+        messageList);
+    link.write(nsMessage);
+  }
+
+  @Override
+  public void write(final T message) {
+    final List<T> messageList = new ArrayList<>(1);
+    messageList.add(message);
+    write(messageList);
+  }
+
+  @Override
+  public void close() {
+    if (closed.compareAndSet(false, true)) {
+      connFactory.removeConnection(this.destId);
+      link = null;
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("Connection from")
+        .append(connFactory.getSrcId())
+        .append(":")
+        .append(connFactory.getConnectionFactoryId())
+        .append(" to ")
+        .append(destId)
+        .append(":")
+        .append(connFactory.getConnectionFactoryId());
+    return sb.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java
new file mode 100644
index 0000000..d49988d
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.Connection;
+import org.apache.reef.io.network.ConnectionFactory;
+import org.apache.reef.io.network.Message;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.transport.Link;
+import org.apache.reef.wake.remote.transport.LinkListener;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A connection factory which is created by NetworkConnectionService.
+ */
+final class NetworkConnectionFactory<T> implements ConnectionFactory<T> {
+
+  private final ConcurrentMap<Identifier, Connection<T>> connectionMap;
+  private final String connFactoryId;
+  private final Codec<T> eventCodec;
+  private final EventHandler<Message<T>> eventHandler;
+  private final LinkListener<Message<T>> eventListener;
+  private final NetworkConnectionServiceImpl networkService;
+
+  NetworkConnectionFactory(
+      final NetworkConnectionServiceImpl networkService,
+      final String connFactoryId,
+      final Codec<T> eventCodec,
+      final EventHandler<Message<T>> eventHandler,
+      final LinkListener<Message<T>> eventListener) {
+    this.networkService = networkService;
+    this.connectionMap = new ConcurrentHashMap<>();
+    this.connFactoryId = connFactoryId;
+    this.eventCodec = eventCodec;
+    this.eventHandler = eventHandler;
+    this.eventListener = eventListener;
+  }
+
+  /**
+   * Creates a new connection.
+   * @param destId a destination identifier of NetworkConnectionService.
+   */
+  @Override
+  public Connection<T> newConnection(final Identifier destId) {
+    final Connection<T> connection = connectionMap.get(destId);
+
+    if (connection == null) {
+      final Connection<T> newConnection = new NetworkConnection<>(this, destId);
+      connectionMap.putIfAbsent(destId, newConnection);
+      return connectionMap.get(destId);
+    }
+    return connection;
+  }
+
+  <T> Link<NetworkConnectionServiceMessage<T>> openLink(final Identifier remoteId) throws NetworkException {
+    return networkService.openLink(remoteId);
+  }
+
+  String getConnectionFactoryId() {
+    return this.connFactoryId;
+  }
+
+  Identifier getSrcId() {
+    return this.networkService.getNetworkConnectionServiceId();
+  }
+
+  EventHandler<Message<T>> getEventHandler() {
+    return eventHandler;
+  }
+
+  LinkListener<Message<T>> getLinkListener() {
+    return eventListener;
+  }
+
+  public void removeConnection(final Identifier remoteId) {
+    connectionMap.remove(remoteId);
+  }
+
+  Codec<T> getCodec() {
+    return eventCodec;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceExceptionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceExceptionHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceExceptionHandler.java
new file mode 100644
index 0000000..2e4bd43
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceExceptionHandler.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default exception handler.
+ */
+public final class NetworkConnectionServiceExceptionHandler implements EventHandler<Exception> {
+
+  private static final Logger LOG = Logger.getLogger(NetworkConnectionServiceExceptionHandler.class.getName());
+
+  @Inject
+  public NetworkConnectionServiceExceptionHandler() {
+  }
+
+  @Override
+  public void onNext(final Exception value) {
+    LOG.log(Level.WARNING, "An exception occurred in transport of NetworkConnectionService: {0}", value);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java
new file mode 100644
index 0000000..f5e1f83
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.Tuple;
+import org.apache.reef.io.network.ConnectionFactory;
+import org.apache.reef.io.network.Message;
+import org.apache.reef.io.network.NetworkConnectionService;
+import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory;
+import org.apache.reef.io.network.impl.config.NetworkConnectionServicePort;
+import org.apache.reef.io.network.naming.NameResolver;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.impl.SingleThreadStage;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Link;
+import org.apache.reef.wake.remote.transport.LinkListener;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.TransportFactory;
+
+import javax.inject.Inject;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default Network connection service implementation.
+ */
+public final class NetworkConnectionServiceImpl implements NetworkConnectionService {
+
+  private static final Logger LOG = Logger.getLogger(NetworkConnectionServiceImpl.class.getName());
+
+  /**
+   * An identifier factory registering network connection service id.
+   */
+  private final IdentifierFactory idFactory;
+  /**
+   * A name resolver looking up nameserver.
+   */
+  private final NameResolver nameResolver;
+  /**
+   * A messaging transport.
+   */
+  private final Transport transport;
+  /**
+   * A map of (id of connection factory, a connection factory instance).
+   */
+  private final ConcurrentMap<String, NetworkConnectionFactory> connFactoryMap;
+  /**
+   * A network connection service identifier.
+   */
+  private Identifier myId;
+  /**
+   * A network connection service message codec.
+   */
+  private final Codec<NetworkConnectionServiceMessage> nsCodec;
+  /**
+   * A network connection service link listener.
+   */
+  private final LinkListener<NetworkConnectionServiceMessage> nsLinkListener;
+  /**
+   * A stage registering identifiers to nameServer.
+   */
+  private final EStage<Tuple<Identifier, InetSocketAddress>> nameServiceRegisteringStage;
+  /**
+   * A stage unregistering identifiers from nameServer.
+   */
+  private final EStage<Identifier> nameServiceUnregisteringStage;
+
+  @Inject
+  private NetworkConnectionServiceImpl(
+      @Parameter(NetworkConnectionServiceIdFactory.class) final IdentifierFactory idFactory,
+      @Parameter(NetworkConnectionServicePort.class) final int nsPort,
+      final TransportFactory transportFactory,
+      final NameResolver nameResolver) {
+    this.idFactory = idFactory;
+    this.connFactoryMap = new ConcurrentHashMap<>();
+    this.nsCodec = new NetworkConnectionServiceMessageCodec(idFactory, connFactoryMap);
+    this.nsLinkListener = new NetworkConnectionServiceLinkListener(connFactoryMap);
+    final EventHandler<TransportEvent> recvHandler =
+        new NetworkConnectionServiceReceiveHandler(connFactoryMap, nsCodec);
+    this.nameResolver = nameResolver;
+    this.transport = transportFactory.newInstance(nsPort, recvHandler, recvHandler,
+        new NetworkConnectionServiceExceptionHandler());
+
+    this.nameServiceRegisteringStage = new SingleThreadStage<>(
+        "NameServiceRegisterer", new EventHandler<Tuple<Identifier, InetSocketAddress>>() {
+          @Override
+          public void onNext(final Tuple<Identifier, InetSocketAddress> tuple) {
+            try {
+              nameResolver.register(tuple.getKey(), tuple.getValue());
+              LOG.log(Level.FINEST, "Registered {0} with nameservice", tuple.getKey());
+            } catch (final Exception ex) {
+              final String msg = "Unable to register " + tuple.getKey() + " with name service";
+              LOG.log(Level.WARNING, msg, ex);
+              throw new RuntimeException(msg, ex);
+            }
+          }
+        }, 5);
+
+    this.nameServiceUnregisteringStage = new SingleThreadStage<>(
+        "NameServiceRegisterer", new EventHandler<Identifier>() {
+          @Override
+          public void onNext(final Identifier id) {
+            try {
+              nameResolver.unregister(id);
+              LOG.log(Level.FINEST, "Unregistered {0} with nameservice", id);
+            } catch (final Exception ex) {
+              final String msg = "Unable to unregister " + id + " with name service";
+              LOG.log(Level.WARNING, msg, ex);
+              throw new RuntimeException(msg, ex);
+            }
+          }
+        }, 5);
+  }
+
+  @Override
+  public <T> void registerConnectionFactory(final Identifier connFactoryId,
+                                            final Codec<T> codec,
+                                            final EventHandler<Message<T>> eventHandler,
+                                            final LinkListener<Message<T>> linkListener) throws NetworkException {
+    String id = connFactoryId.toString();
+    if (connFactoryMap.get(id) != null) {
+      throw new NetworkException("ConnectionFactory " + connFactoryId + " was already registered.");
+    }
+    final ConnectionFactory connFactory = connFactoryMap.putIfAbsent(id,
+        new NetworkConnectionFactory<>(this, id, codec, eventHandler, linkListener));
+
+    if (connFactory != null) {
+      throw new NetworkException("ConnectionFactory " + connFactoryId + " was already registered.");
+    }
+  }
+
+  @Override
+  public void unregisterConnectionFactory(final Identifier connFactoryId) {
+    final String id = connFactoryId.toString();
+    final ConnectionFactory  connFactory = connFactoryMap.get(id);
+    if (connFactory != null) {
+      final ConnectionFactory cf = connFactoryMap.remove(id);
+      if (cf == null) {
+        LOG.log(Level.WARNING, "ConnectionFactory of {0} is null", id);
+      }
+    } else {
+      LOG.log(Level.WARNING, "ConnectionFactory of {0} is null", id);
+    }
+  }
+
+  /**
+   * Registers a source identifier of NetworkConnectionService.
+   * @param ncsId
+   * @throws Exception
+   */
+  @Override
+  public void registerId(final Identifier ncsId) {
+    LOG.log(Level.INFO, "Registering NetworkConnectionService " + ncsId);
+    this.myId = ncsId;
+    final Tuple<Identifier, InetSocketAddress> tuple =
+        new Tuple<>(ncsId, (InetSocketAddress) this.transport.getLocalAddress());
+    LOG.log(Level.FINEST, "Binding {0} to NetworkConnectionService@({1})",
+        new Object[]{tuple.getKey(), tuple.getValue()});
+    this.nameServiceRegisteringStage.onNext(tuple);
+  }
+
+  /**
+   * Open a channel for destination identifier of NetworkConnectionService.
+   * @param destId
+   * @throws NetworkException
+   */
+  <T> Link<NetworkConnectionServiceMessage<T>> openLink(final Identifier destId) throws NetworkException {
+    try {
+      final SocketAddress address = nameResolver.lookup(destId);
+      if (address == null) {
+        throw new NetworkException("Lookup " + destId + " is null");
+      }
+      return transport.open(address, nsCodec, nsLinkListener);
+    } catch(Exception e) {
+      e.printStackTrace();
+      throw new NetworkException(e);
+    }
+  }
+
+  /**
+   * Gets a ConnectionFactory.
+   * @param connFactoryId the identifier of the ConnectionFactory
+   */
+  @Override
+  public <T> ConnectionFactory<T> getConnectionFactory(final Identifier connFactoryId) {
+    final ConnectionFactory<T> connFactory = connFactoryMap.get(connFactoryId.toString());
+    if (connFactory == null) {
+      throw new RuntimeException("Cannot find ConnectionFactory of " + connFactoryId + ".");
+    }
+    return connFactory;
+  }
+
+  @Override
+  public void unregisterId(final Identifier ncsId) {
+    LOG.log(Level.FINEST, "Unbinding {0} to NetworkConnectionService@({1})",
+        new Object[]{ncsId, this.transport.getLocalAddress()});
+    this.myId = null;
+    this.nameServiceUnregisteringStage.onNext(ncsId);
+  }
+
+  @Override
+  public Identifier getNetworkConnectionServiceId() {
+    return this.myId;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.log(Level.FINE, "Shutting down");
+    this.nameServiceRegisteringStage.close();
+    this.nameServiceUnregisteringStage.close();
+    this.transport.close();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceLinkListener.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceLinkListener.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceLinkListener.java
new file mode 100644
index 0000000..2c9528c
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceLinkListener.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.wake.remote.transport.LinkListener;
+
+import java.net.SocketAddress;
+import java.util.Map;
+
+final class NetworkConnectionServiceLinkListener implements LinkListener<NetworkConnectionServiceMessage> {
+
+  private final Map<String, NetworkConnectionFactory> connFactoryMap;
+
+  NetworkConnectionServiceLinkListener(
+      final Map<String, NetworkConnectionFactory> connFactoryMap) {
+    this.connFactoryMap = connFactoryMap;
+  }
+
+  @Override
+  public void onSuccess(final NetworkConnectionServiceMessage message) {
+    final LinkListener listener = connFactoryMap.get(message.getConnectionFactoryId()).getLinkListener();
+    if (listener != null) {
+      listener.onSuccess(message);
+    }
+
+  }
+
+  @Override
+  public void onException(final Throwable cause, final SocketAddress remoteAddress,
+                          final NetworkConnectionServiceMessage message) {
+    final LinkListener listener = connFactoryMap.get(message.getConnectionFactoryId()).getLinkListener();
+    if (listener != null) {
+      listener.onException(cause, remoteAddress, message);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java
new file mode 100644
index 0000000..9ce10c1
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.io.network.Message;
+import org.apache.reef.wake.Identifier;
+
+import java.net.SocketAddress;
+import java.util.List;
+
+
+/**
+ * NetworkConnectionServiceMessage implementation.
+ * This is a wrapper message of message type <T>.
+ */
+final class NetworkConnectionServiceMessage<T> implements Message<T> {
+
+  private final List<T> messages;
+  private SocketAddress remoteAddr;
+  private final String connFactoryId;
+  private final Identifier srcId;
+  private final Identifier destId;
+
+  /**
+   * Constructs a network connection service message.
+   *
+   * @param connFactoryId the connection factory identifier
+   * @param srcId      the source identifier of NetworkConnectionService
+   * @param destId   the destination identifier of NetworkConnectionService
+   * @param messages  the list of messages
+   */
+  public NetworkConnectionServiceMessage(
+      final String connFactoryId,
+      final Identifier srcId,
+      final Identifier destId,
+      final List<T> messages) {
+    this.connFactoryId = connFactoryId;
+    this.srcId = srcId;
+    this.destId = destId;
+    this.messages = messages;
+  }
+
+  void setRemoteAddress(final SocketAddress remoteAddress) {
+    this.remoteAddr = remoteAddress;
+  }
+
+  /**
+   * Gets a destination identifier.
+   *
+   * @return a remote id
+   */
+  @Override
+  public Identifier getDestId() {
+    return destId;
+  }
+
+  /**
+   * Gets a connection factory identifier.
+   *
+   * @return a connection factory id
+   */
+  public String getConnectionFactoryId() {
+    return connFactoryId;
+  }
+
+
+  /**
+   * Gets a source identifier of NetworkConnectionService.
+   *
+   * @return a source id
+   */
+  @Override
+  public Identifier getSrcId() {
+    return srcId;
+  }
+
+  @Override
+  public List<T> getData() {
+    return messages;
+  }
+
+  /**
+   * Returns a string representation of this object.
+   *
+   * @return a string representation of this object
+   */
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("NSMessage");
+    builder.append(" remoteID=");
+    builder.append(destId);
+    builder.append(" message=[| ");
+    for (T message : messages) {
+      builder.append(message + " |");
+    }
+    builder.append("]");
+    return builder.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessageCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessageCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessageCodec.java
new file mode 100644
index 0000000..bdccff2
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessageCodec.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * DefaultNetworkMessageCodec implementation.
+ * This codec encodes/decodes NetworkConnectionServiceMessageImpl according to the type <T>.
+ */
+final class NetworkConnectionServiceMessageCodec implements Codec<NetworkConnectionServiceMessage> {
+
+  private final IdentifierFactory factory;
+  /**
+   * Contains entries of (id of connection factory, instance of connection factory).
+   */
+  private final Map<String, NetworkConnectionFactory> connFactoryMap;
+  /**
+   * Contains entries of (instance of codec, boolean whether the codec is streaming or not).
+   */
+  private final ConcurrentMap<Codec, Boolean> isStreamingCodecMap;
+
+  /**
+   * Constructs a network connection service message codec.
+   */
+  NetworkConnectionServiceMessageCodec(
+      final IdentifierFactory factory,
+      final Map<String, NetworkConnectionFactory> connFactoryMap) {
+    this.factory = factory;
+    this.connFactoryMap = connFactoryMap;
+    this.isStreamingCodecMap = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Encodes a network connection service message to bytes.
+   * @param obj a message
+   * @return bytes
+   */
+  @Override
+  public byte[] encode(final NetworkConnectionServiceMessage obj) {
+    final Codec codec = connFactoryMap.get(obj.getConnectionFactoryId()).getCodec();
+    Boolean isStreamingCodec = isStreamingCodecMap.get(codec);
+    if (isStreamingCodec == null) {
+      isStreamingCodec = codec instanceof StreamingCodec;
+      isStreamingCodecMap.putIfAbsent(codec, isStreamingCodec);
+    }
+
+    try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      try (final DataOutputStream daos = new DataOutputStream(baos)) {
+        daos.writeUTF(obj.getConnectionFactoryId());
+        daos.writeUTF(obj.getSrcId().toString());
+        daos.writeUTF(obj.getDestId().toString());
+        daos.writeInt(obj.getData().size());
+
+        if (isStreamingCodec) {
+          for (final Object rec : obj.getData()) {
+            ((StreamingCodec) codec).encodeToStream(rec, daos);
+          }
+        } else {
+          final Iterable dataList = obj.getData();
+          for (final Object message : dataList) {
+            final byte[] bytes = codec.encode(message);
+            daos.writeInt(bytes.length);
+            daos.write(bytes);
+          }
+        }
+        return baos.toByteArray();
+      }
+    } catch (final IOException e) {
+      throw new RuntimeException("IOException", e);
+    }
+  }
+
+  /**
+   * Decodes a network connection service message from bytes.
+   *
+   * @param data bytes
+   * @return a message
+   */
+  @Override
+  public NetworkConnectionServiceMessage decode(final byte[] data) {
+    try (final ByteArrayInputStream bais = new ByteArrayInputStream(data)) {
+      try (final DataInputStream dais = new DataInputStream(bais)) {
+        final String connFactoryId = dais.readUTF();
+        final Identifier srcId = factory.getNewInstance(dais.readUTF());
+        final Identifier destId = factory.getNewInstance(dais.readUTF());
+        final int size = dais.readInt();
+        final List list = new ArrayList(size);
+        final Codec codec = connFactoryMap.get(connFactoryId).getCodec();
+        Boolean isStreamingCodec = isStreamingCodecMap.get(codec);
+        if (isStreamingCodec == null) {
+          isStreamingCodec = codec instanceof StreamingCodec;
+          isStreamingCodecMap.putIfAbsent(codec, isStreamingCodec);
+        }
+
+        if (isStreamingCodec) {
+          for (int i = 0; i < size; i++) {
+            list.add(((StreamingCodec) codec).decodeFromStream(dais));
+          }
+        } else {
+          for (int i = 0; i < size; i++) {
+            final int byteSize = dais.readInt();
+            final byte[] bytes = new byte[byteSize];
+            dais.read(bytes);
+            list.add(codec.decode(bytes));
+          }
+        }
+
+        return new NetworkConnectionServiceMessage(
+            connFactoryId,
+            srcId,
+            destId,
+            list
+        );
+      }
+    } catch (final IOException e) {
+      throw new RuntimeException("IOException", e);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceReceiveHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceReceiveHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceReceiveHandler.java
new file mode 100644
index 0000000..1875e79
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceReceiveHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+
+import java.util.Map;
+
+/**
+ * NetworkConnectionService event handler.
+ * It dispatches events to the corresponding eventHandler.
+ */
+final class NetworkConnectionServiceReceiveHandler implements EventHandler<TransportEvent> {
+
+  private final Map<String, NetworkConnectionFactory> connFactoryMap;
+  private final Codec<NetworkConnectionServiceMessage> codec;
+
+  NetworkConnectionServiceReceiveHandler(
+      final Map<String, NetworkConnectionFactory> connFactoryMap,
+      final Codec<NetworkConnectionServiceMessage> codec) {
+    this.connFactoryMap = connFactoryMap;
+    this.codec = codec;
+  }
+
+  @Override
+  public void onNext(final TransportEvent transportEvent) {
+    final NetworkConnectionServiceMessage nsMessage = codec.decode(transportEvent.getData());
+    nsMessage.setRemoteAddress(transportEvent.getRemoteAddress());
+    final NetworkConnectionFactory connFactory = connFactoryMap.get(nsMessage.getConnectionFactoryId());
+    final EventHandler eventHandler = connFactory.getEventHandler();
+    eventHandler.onNext(nsMessage);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
index c79f88a..a2f5f2a 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
@@ -39,7 +39,7 @@ public class NetworkServiceParameters {
   public static class NetworkServiceIdentifierFactory implements Name<IdentifierFactory> {
   }
 
-  @NamedParameter(doc = "port for the network service", short_name = "nsport", default_value = "7070")
+  @NamedParameter(doc = "port for the network service", short_name = "nsport", default_value = "0")
   public static class NetworkServicePort implements Name<Integer> {
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java
new file mode 100644
index 0000000..540fe80
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.io.network.NetworkConnectionService;
+import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.events.TaskStop;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.IdentifierFactory;
+
+import javax.inject.Inject;
+/**
+ * TaskStop event handler for unregistering NetworkConnectionService.
+ * Users have to bind this handler into ServiceConfiguration.ON_TASK_STOP.
+ */
+public final class UnbindNetworkConnectionServiceFromTask implements EventHandler<TaskStop> {
+
+  private final NetworkConnectionService ncs;
+  private final IdentifierFactory idFac;
+
+  @Inject
+  public UnbindNetworkConnectionServiceFromTask(
+      final NetworkConnectionService ncs,
+      @Parameter(NetworkConnectionServiceIdFactory.class) final IdentifierFactory idFac) {
+    this.ncs = ncs;
+    this.idFac = idFac;
+  }
+
+  @Override
+  public void onNext(final TaskStop task) {
+    this.ncs.unregisterId(this.idFac.getNewInstance(task.getId()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServiceIdFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServiceIdFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServiceIdFactory.java
new file mode 100644
index 0000000..67c3e96
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServiceIdFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl.config;
+
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.IdentifierFactory;
+
+@NamedParameter(doc = "identifier factory for the service", short_name = "ncsfactory",
+    default_class = StringIdentifierFactory.class)
+public final class NetworkConnectionServiceIdFactory implements Name<IdentifierFactory> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServicePort.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServicePort.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServicePort.java
new file mode 100644
index 0000000..abaf840
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServicePort.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl.config;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "port for the network connection service", short_name = "ncsport", default_value = "0")
+public final class NetworkConnectionServicePort implements Name<Integer> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/package-info.java
new file mode 100644
index 0000000..e54151a
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * TODO: Document.
+ */
+package org.apache.reef.io.network.impl.config;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java
new file mode 100644
index 0000000..ddf4bfd
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.Connection;
+import org.apache.reef.io.network.util.StringCodec;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.services.network.util.*;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.util.concurrent.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default Network connection service test.
+ */
+public class NetworkConnectionServiceTest {
+  private static final Logger LOG = Logger.getLogger(NetworkConnectionServiceTest.class.getName());
+
+  private final LocalAddressProvider localAddressProvider;
+  private final String localAddress;
+  private final Identifier groupCommClientId;
+  private final Identifier shuffleClientId;
+
+  public NetworkConnectionServiceTest() throws InjectionException {
+    localAddressProvider = LocalAddressProviderFactory.getInstance();
+    localAddress = localAddressProvider.getLocalAddress();
+
+    final IdentifierFactory idFac = new StringIdentifierFactory();
+    this.groupCommClientId = idFac.getNewInstance("groupComm");
+    this.shuffleClientId = idFac.getNewInstance("shuffle");
+  }
+
+  @Rule
+  public TestName name = new TestName();
+
+  private void runMessagingNetworkConnectionService(Codec<String> codec) throws Exception {
+    final int numMessages = 2000;
+    final Monitor monitor = new Monitor();
+    final NetworkMessagingTest messagingTest = new NetworkMessagingTest(localAddress);
+    messagingTest.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec);
+
+    final Connection<String> conn = messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
+    try {
+      conn.open();
+      for (int count = 0; count < numMessages; ++count) {
+        // send messages to the receiver.
+        conn.write("hello" + count);
+      }
+      monitor.mwait();
+    } catch (NetworkException e) {
+      e.printStackTrace();
+    }
+
+    conn.close();
+    messagingTest.close();
+  }
+
+  /**
+   * NetworkConnectionService messaging test.
+   */
+  @Test
+  public void testMessagingNetworkConnectionService() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    runMessagingNetworkConnectionService(new StringCodec());
+  }
+
+  /**
+   * NetworkConnectionService streaming messaging test.
+   */
+  @Test
+  public void testStreamingMessagingNetworkConnectionService() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    runMessagingNetworkConnectionService(new StreamingStringCodec());
+  }
+
+  public void runNetworkConnServiceWithMultipleConnFactories(Codec<String> stringCodec, Codec<Integer> integerCodec) throws Exception {
+    final ExecutorService executor = Executors.newFixedThreadPool(5);
+
+    final int groupcommMessages = 1000;
+    final Monitor monitor = new Monitor();
+    final NetworkMessagingTest messagingTest = new NetworkMessagingTest(localAddress);
+
+    messagingTest.registerTestConnectionFactory(groupCommClientId, groupcommMessages, monitor, stringCodec);
+
+    final int shuffleMessges = 2000;
+    final Monitor monitor2 = new Monitor();
+    messagingTest.registerTestConnectionFactory(shuffleClientId, shuffleMessges, monitor2, integerCodec);
+
+    executor.submit(new Runnable() {
+      @Override
+      public void run() {
+        final Connection<String> conn = messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
+        try {
+          conn.open();
+          for (int count = 0; count < groupcommMessages; ++count) {
+            // send messages to the receiver.
+            conn.write("hello" + count);
+          }
+          monitor.mwait();
+          conn.close();
+        } catch (Exception e) {
+          e.printStackTrace();
+          throw new RuntimeException(e);
+        }
+      }
+    });
+
+    executor.submit(new Runnable() {
+      @Override
+      public void run() {
+        final Connection<Integer> conn = messagingTest.getConnectionFromSenderToReceiver(shuffleClientId);
+        try {
+          conn.open();
+          for (int count = 0; count < shuffleMessges; ++count) {
+            // send messages to the receiver.
+            conn.write(count);
+          }
+          monitor2.mwait();
+          conn.close();
+        } catch (Exception e) {
+          e.printStackTrace();
+          throw new RuntimeException(e);
+        }
+      }
+    });
+
+    monitor.mwait();
+    monitor2.mwait();
+    executor.shutdown();
+    messagingTest.close();
+  }
+
+  /**
+   * Test NetworkService registering multiple connection factories.
+   */
+  @Test
+  public void testMultipleConnectionFactoriesTest() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    runNetworkConnServiceWithMultipleConnFactories(new StringCodec(), new ObjectSerializableCodec<Integer>());
+  }
+
+  /**
+   * Test NetworkService registering multiple connection factories with Streamingcodec.
+   */
+  @Test
+  public void testMultipleConnectionFactoriesStreamingTest() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    runNetworkConnServiceWithMultipleConnFactories(new StreamingStringCodec(), new StreamingIntegerCodec());
+  }
+
+  /**
+   * NetworkService messaging rate benchmark.
+   */
+  @Test
+  public void testMessagingNetworkConnServiceRate() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
+
+    for (int size : messageSizes) {
+      final int numMessages = 300000 / (Math.max(1, size / 512));
+      final Monitor monitor = new Monitor();
+      final Codec<String> codec = new StringCodec();
+      final NetworkMessagingTest messagingTest = new NetworkMessagingTest(localAddress);
+      messagingTest.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec);
+      final Connection<String> conn = messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
+
+      // build the message
+      final StringBuilder msb = new StringBuilder();
+      for (int i = 0; i < size; i++) {
+        msb.append("1");
+      }
+      final String message = msb.toString();
+
+      long start = System.currentTimeMillis();
+      try {
+        conn.open();
+        for (int count = 0; count < numMessages; ++count) {
+          // send messages to the receiver.
+          conn.write(message);
+        }
+        monitor.mwait();
+      } catch (NetworkException e) {
+        e.printStackTrace();
+      }
+      final long end = System.currentTimeMillis();
+
+      final double runtime = ((double) end - start) / 1000;
+      LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numMessages / runtime + " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / runtime);// x2 for unicode chars
+      messagingTest.close();
+    }
+  }
+
+  /**
+   * NetworkService messaging rate benchmark.
+   */
+  @Test
+  public void testMessagingNetworkConnServiceRateDisjoint() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    final BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>();
+
+    final int numThreads = 4;
+    final int size = 2000;
+    final int numMessages = 300000 / (Math.max(1, size / 512));
+    final int totalNumMessages = numMessages * numThreads;
+
+    final ExecutorService e = Executors.newCachedThreadPool();
+    for (int t = 0; t < numThreads; t++) {
+      final int tt = t;
+
+      e.submit(new Runnable() {
+        public void run() {
+          try {
+            final Monitor monitor = new Monitor();
+            final Codec<String> codec = new StringCodec();
+            final NetworkMessagingTest messagingTest = new NetworkMessagingTest(localAddress);
+            messagingTest.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec);
+            final Connection<String> conn = messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
+
+            // build the message
+            final StringBuilder msb = new StringBuilder();
+            for (int i = 0; i < size; i++) {
+              msb.append("1");
+            }
+            final String message = msb.toString();
+
+            try {
+              conn.open();
+              for (int count = 0; count < numMessages; ++count) {
+                // send messages to the receiver.
+                conn.write(message);
+              }
+              monitor.mwait();
+            } catch (NetworkException e) {
+              e.printStackTrace();
+            }
+            messagingTest.close();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+    }
+
+    // start and time
+    final long start = System.currentTimeMillis();
+    final Object ignore = new Object();
+    for (int i = 0; i < numThreads; i++) barrier.add(ignore);
+    e.shutdown();
+    e.awaitTermination(100, TimeUnit.SECONDS);
+    final long end = System.currentTimeMillis();
+    final double runtime = ((double) end - start) / 1000;
+    LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
+  }
+
+  @Test
+  public void testMultithreadedSharedConnMessagingNetworkConnServiceRate() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    final int[] messageSizes = {2000};// {1,16,32,64,512,64*1024,1024*1024};
+
+    for (int size : messageSizes) {
+      final int numMessages = 300000 / (Math.max(1, size / 512));
+      final int numThreads = 2;
+      final int totalNumMessages = numMessages * numThreads;
+      final Monitor monitor = new Monitor();
+      final Codec<String> codec = new StringCodec();
+      final NetworkMessagingTest messagingTest = new NetworkMessagingTest(localAddress);
+      messagingTest.registerTestConnectionFactory(groupCommClientId, totalNumMessages, monitor, codec);
+
+      final ExecutorService e = Executors.newCachedThreadPool();
+
+      // build the message
+      final StringBuilder msb = new StringBuilder();
+      for (int i = 0; i < size; i++) {
+        msb.append("1");
+      }
+      final String message = msb.toString();
+      final Connection<String> conn = messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
+
+      final long start = System.currentTimeMillis();
+      for (int i = 0; i < numThreads; i++) {
+        e.submit(new Runnable() {
+          @Override
+          public void run() {
+
+              try {
+                conn.open();
+                for (int count = 0; count < numMessages; ++count) {
+                  // send messages to the receiver.
+                  conn.write(message);
+                }
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+          }
+        });
+      }
+
+      e.shutdown();
+      e.awaitTermination(30, TimeUnit.SECONDS);
+      monitor.mwait();
+      final long end = System.currentTimeMillis();
+      final double runtime = ((double) end - start) / 1000;
+      LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
+      conn.close();
+      messagingTest.close();
+    }
+  }
+
+  /**
+   * NetworkService messaging rate benchmark.
+   */
+  @Test
+  public void testMessagingNetworkConnServiceBatchingRate() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+
+    final int batchSize = 1024 * 1024;
+    final int[] messageSizes = {32, 64, 512};
+
+    for (int size : messageSizes) {
+      final int numMessages = 300 / (Math.max(1, size / 512));
+      final Monitor monitor = new Monitor();
+      final Codec<String> codec = new StringCodec();
+      final NetworkMessagingTest messagingTest = new NetworkMessagingTest(localAddress);
+      messagingTest.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec);
+      final Connection<String> conn = messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
+
+      // build the message
+      final StringBuilder msb = new StringBuilder();
+      for (int i = 0; i < size; i++) {
+        msb.append("1");
+      }
+      final String message = msb.toString();
+
+      final long start = System.currentTimeMillis();
+      try {
+        for (int i = 0; i < numMessages; i++) {
+          final StringBuilder sb = new StringBuilder();
+          for (int j = 0; j < batchSize / size; j++) {
+            sb.append(message);
+          }
+          conn.open();
+          conn.write(sb.toString());
+        }
+        monitor.mwait();
+      } catch (NetworkException e) {
+        e.printStackTrace();
+        throw new RuntimeException(e);
+      }
+
+      final long end = System.currentTimeMillis();
+      final double runtime = ((double) end - start) / 1000;
+      final long numAppMessages = numMessages * batchSize / size;
+      LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numAppMessages / runtime + " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / runtime);// x2 for unicode chars
+      messagingTest.close();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
index c1f76b8..2bb75ae 100644
--- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
@@ -387,12 +387,8 @@ public class NetworkServiceTest {
 
           @Override
           public void run() {
-            try {
-              for (int i = 0; i < numMessages; i++) {
-                conn.write(message);
-              }
-            } catch (NetworkException e) {
-              e.printStackTrace();
+            for (int i = 0; i < numMessages; i++) {
+              conn.write(message);
             }
           }
         });

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTest.java
new file mode 100644
index 0000000..ba0cfe4
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network.util;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.Connection;
+import org.apache.reef.io.network.Message;
+import org.apache.reef.io.network.NetworkConnectionService;
+import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory;
+import org.apache.reef.io.network.naming.NameResolverConfiguration;
+import org.apache.reef.io.network.naming.NameServer;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.transport.LinkListener;
+
+import java.net.SocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Helper class for NetworkConnectionService test.
+ */
+public final class NetworkMessagingTest {
+  private static final Logger LOG = Logger.getLogger(NetworkMessagingTest.class.getName());
+
+  private final IdentifierFactory factory;
+  private final NetworkConnectionService receiverNetworkConnService;
+  private final NetworkConnectionService senderNetworkConnService;
+  private final String receiver;
+  private final String sender;
+  private final NameServer nameServer;
+
+  public NetworkMessagingTest(final String localAddress) throws InjectionException {
+    // name server
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    this.nameServer = injector.getInstance(NameServer.class);
+    final Configuration netConf = NameResolverConfiguration.CONF
+        .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
+        .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServer.getPort())
+        .build();
+
+    LOG.log(Level.FINEST, "=== Test network connection service receiver start");
+    // network service for receiver
+    this.receiver = "receiver";
+    final Injector injectorReceiver = injector.forkInjector(netConf);
+    this.receiverNetworkConnService = injectorReceiver.getInstance(NetworkConnectionService.class);
+    this.factory = injectorReceiver.getNamedInstance(NetworkConnectionServiceIdFactory.class);
+    this.receiverNetworkConnService.registerId(this.factory.getNewInstance(receiver));
+
+    // network service for sender
+    this.sender = "sender";
+    LOG.log(Level.FINEST, "=== Test network connection service sender start");
+    final Injector injectorSender = injector.forkInjector(netConf);
+    senderNetworkConnService = injectorSender.getInstance(NetworkConnectionService.class);
+    senderNetworkConnService.registerId(this.factory.getNewInstance(this.sender));
+  }
+
+  public <T> void registerTestConnectionFactory(final Identifier connFactoryId,
+                                                final int numMessages, final Monitor monitor,
+                                                final Codec<T> codec) throws NetworkException {
+    receiverNetworkConnService.registerConnectionFactory(connFactoryId, codec, new MessageHandler<T>(monitor, numMessages), new TestListener<T>());
+    senderNetworkConnService.registerConnectionFactory(connFactoryId, codec, new MessageHandler<T>(monitor, numMessages), new TestListener<T>());
+  }
+
+  public <T> Connection<T> getConnectionFromSenderToReceiver(final Identifier connFactoryId) {
+    final Identifier destId = factory.getNewInstance(receiver);
+    return (Connection<T>)senderNetworkConnService.getConnectionFactory(connFactoryId).newConnection(destId);
+  }
+
+  public void close() throws Exception {
+    senderNetworkConnService.close();
+    receiverNetworkConnService.close();
+    nameServer.close();
+  }
+
+  public static final class MessageHandler<T> implements EventHandler<Message<T>> {
+    private final int expected;
+    private final Monitor monitor;
+    private AtomicInteger count = new AtomicInteger(0);
+
+    public MessageHandler(final Monitor monitor,
+                          final int expected) {
+      this.monitor = monitor;
+      this.expected = expected;
+    }
+
+    @Override
+    public void onNext(Message<T> value) {
+      count.incrementAndGet();
+      LOG.log(Level.FINE, "Count: {0}", count.get());
+      LOG.log(Level.FINE,
+          "OUT: {0} received {1} from {2} to {3}",
+          new Object[]{value, value.getSrcId(), value.getDestId()});
+
+      for (final T obj : value.getData()) {
+        LOG.log(Level.FINE, "OUT: data: {0}", obj);
+      }
+
+      if (count.get() == expected) {
+        monitor.mnotify();
+      }
+    }
+  }
+
+  public static final class TestListener<T> implements LinkListener<Message<T>> {
+    @Override
+    public void onSuccess(Message<T> message) {
+      LOG.log(Level.FINE, "success: " + message);
+    }
+    @Override
+    public void onException(Throwable cause, SocketAddress remoteAddress, Message<T> message) {
+      LOG.log(Level.WARNING, "exception: " + cause + message);
+      throw new RuntimeException(cause);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingIntegerCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingIntegerCodec.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingIntegerCodec.java
new file mode 100644
index 0000000..eaec90f
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingIntegerCodec.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network.util;
+
+import org.apache.reef.io.network.impl.StreamingCodec;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+
+public class StreamingIntegerCodec implements StreamingCodec<Integer> {
+
+  @Override
+  public void encodeToStream(Integer obj, DataOutputStream stream) {
+    try {
+      stream.writeInt(obj);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Integer decodeFromStream(DataInputStream stream) {
+    try {
+      return stream.readInt();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Integer decode(byte[] data) {
+    return null;
+  }
+
+  @Override
+  public byte[] encode(Integer obj) {
+    return new byte[0];
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingStringCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingStringCodec.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingStringCodec.java
new file mode 100644
index 0000000..497c7fa
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingStringCodec.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network.util;
+
+import org.apache.reef.io.network.impl.StreamingCodec;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+
+public class StreamingStringCodec implements StreamingCodec<String> {
+  @Override
+  public byte[] encode(String obj) {
+    return obj.getBytes();
+  }
+
+  @Override
+  public String decode(byte[] buf) {
+    return new String(buf);
+  }
+
+  @Override
+  public void encodeToStream(String obj, DataOutputStream stream) {
+    try {
+      stream.writeUTF(obj);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public String decodeFromStream(DataInputStream stream) {
+    try {
+      return stream.readUTF();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}