You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by bg...@apache.org on 2015/07/10 18:38:38 UTC
incubator-reef git commit: [REEF-184] Allow NetworkService to deal
with multiple message handlers This adds the NetworkConnectionService
interface and its implementation. The interface provides a way to register
multiple ConnectionFactory's, each
Repository: incubator-reef
Updated Branches:
refs/heads/master 84ff5021c -> 41ff14187
[REEF-184] Allow NetworkService to deal with multiple message handlers
This adds the NetworkConnectionService interface and its implementation.
The interface provides a way to register multiple ConnectionFactory's,
each of which is used to create connections for a message type.
JIRA:
[REEF-184] http://issues.apache.org/jira/browse/REEF-184
Pull Request:
Closes #255
Author:
Taegeon Um taegeonum@gmail.com
GeonWoo Kim gwsshs22@gmail.com
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/41ff1418
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/41ff1418
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/41ff1418
Branch: refs/heads/master
Commit: 41ff1418781f885e4847e1c6386c40c9eeb30db8
Parents: 84ff502
Author: taegeonum <ta...@gmail.com>
Authored: Fri Jun 26 20:45:09 2015 +0900
Committer: Byung-Gon Chun <bg...@apache.org>
Committed: Sat Jul 11 01:29:04 2015 +0900
----------------------------------------------------------------------
.../org/apache/reef/io/network/Connection.java | 16 +-
.../io/network/NetworkConnectionService.java | 98 +++++
.../BindNetworkConnectionServiceToTask.java | 51 +++
.../reef/io/network/impl/NSConnection.java | 14 +-
.../reef/io/network/impl/NetworkConnection.java | 94 +++++
.../network/impl/NetworkConnectionFactory.java | 103 +++++
...etworkConnectionServiceExceptionHandler.java | 42 ++
.../impl/NetworkConnectionServiceImpl.java | 239 ++++++++++++
.../NetworkConnectionServiceLinkListener.java | 52 +++
.../impl/NetworkConnectionServiceMessage.java | 115 ++++++
.../NetworkConnectionServiceMessageCodec.java | 146 +++++++
.../NetworkConnectionServiceReceiveHandler.java | 51 +++
.../network/impl/NetworkServiceParameters.java | 2 +-
.../UnbindNetworkConnectionServiceFromTask.java | 50 +++
.../NetworkConnectionServiceIdFactory.java | 29 ++
.../config/NetworkConnectionServicePort.java | 26 ++
.../io/network/impl/config/package-info.java | 22 ++
.../network/NetworkConnectionServiceTest.java | 385 +++++++++++++++++++
.../services/network/NetworkServiceTest.java | 8 +-
.../network/util/NetworkMessagingTest.java | 139 +++++++
.../network/util/StreamingIntegerCodec.java | 57 +++
.../network/util/StreamingStringCodec.java | 56 +++
22 files changed, 1780 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java
index 66f2840..17fc5a1 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java
@@ -20,6 +20,8 @@ package org.apache.reef.io.network;
import org.apache.reef.exception.evaluator.NetworkException;
+import java.util.List;
+
/**
* Connection between two end-points named by identifiers.
*
@@ -35,12 +37,18 @@ public interface Connection<T> extends AutoCloseable {
void open() throws NetworkException;
/**
- * Writes an object to the connection.
+ * Writes a message to the connection.
*
- * @param obj
- * @throws NetworkException
+ * @param message
+ */
+ void write(T message);
+
+ /**
+ * Writes a list of messages to the connection.
+ *
+ * @param messages
*/
- void write(T obj) throws NetworkException;
+ void write(List<T> messages);
/**
* Closes the connection.
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java
new file mode 100644
index 0000000..472e619
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.impl.NetworkConnectionServiceImpl;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.transport.LinkListener;
+
+/**
+ * NetworkConnectionService.
+ *
+ * NetworkConnectionService is a service which is designed for communicating messages with each other.
+ * It creates multiple ConnectionFactories, which create multiple connections.
+ *
+ * Flow of message transfer:
+ * [Downstream]: connection.write(message) -> ConnectionFactory
+ * -> src NetworkConnectionService (encode) -> dest NetworkConnectionService.
+ * [Upstream]: message -> dest NetworkConnectionService (decode) -> ConnectionFactory -> EventHandler.
+ *
+ * Users can register a ConnectionFactory by registering their Codec, EventHandler and LinkListener.
+ * When users send messages via connections created by the ConnectionFactory,
+ *
+ * NetworkConnectionService encodes the messages according to the Codec
+ * registered in the ConnectionFactory and sends them to the destination NetworkConnectionService.
+ *
+ * Also, it receives the messages by decoding the messages and forwarding them
+ * to the corresponding EventHandler registered in the ConnectionFactory.
+ */
+@DefaultImplementation(NetworkConnectionServiceImpl.class)
+public interface NetworkConnectionService extends AutoCloseable {
+
+ /**
+ * Registers an instance of ConnectionFactory corresponding to the connectionFactoryId.
+ * Binds Codec, EventHandler and LinkListener to the ConnectionFactory.
+ * ConnectionFactory can create multiple connections between other NetworkConnectionServices.
+ *
+ * @param connectionFactoryId a connection factory id
+ * @param codec a codec for type <T>
+ * @param eventHandler an event handler for type <T>
+ * @param linkListener a link listener
+ * @throws NetworkException throws a NetworkException when multiple connectionFactoryIds exist.
+ */
+ <T> void registerConnectionFactory(final Identifier connectionFactoryId,
+ final Codec<T> codec,
+ final EventHandler<Message<T>> eventHandler,
+ final LinkListener<Message<T>> linkListener) throws NetworkException;
+
+ /**
+ * Unregisters a connectionFactory corresponding to the connectionFactoryId.
+ * @param connectionFactoryId a connection factory id
+ */
+ void unregisterConnectionFactory(final Identifier connectionFactoryId);
+
+ /**
+ * Gets an instance of ConnectionFactory which is registered by the registerConnectionFactory method.
+ * @param connectionFactoryId a connection factory id
+ */
+ <T> ConnectionFactory<T> getConnectionFactory(final Identifier connectionFactoryId);
+
+ /**
+ * Registers a network connection service identifier.
+ * This can be used for destination identifier
+ * @param ncsId network connection service identifier
+ */
+ void registerId(final Identifier ncsId);
+
+ /**
+ * Unregister a network connection service identifier.
+ * @param ncsId network connection service identifier
+ */
+ void unregisterId(final Identifier ncsId);
+
+ /**
+ * Gets a network connection service client id which is equal to the registered id.
+ */
+ Identifier getNetworkConnectionServiceId();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java
new file mode 100644
index 0000000..69aafcf
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.io.network.NetworkConnectionService;
+import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.events.TaskStart;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.IdentifierFactory;
+
+import javax.inject.Inject;
+
+/**
+ * TaskStart event handler for registering NetworkConnectionService.
+ * Users have to bind this handler into ServiceConfiguration.ON_TASK_STARTED.
+ */
+public final class BindNetworkConnectionServiceToTask implements EventHandler<TaskStart> {
+
+ private final NetworkConnectionService ncs;
+ private final IdentifierFactory idFac;
+
+ @Inject
+ public BindNetworkConnectionServiceToTask(
+ final NetworkConnectionService ncs,
+ @Parameter(NetworkConnectionServiceIdFactory.class) final IdentifierFactory idFac) {
+ this.ncs = ncs;
+ this.idFac = idFac;
+ }
+
+ @Override
+ public void onNext(final TaskStart task) {
+ this.ncs.registerId(this.idFac.getNewInstance(task.getId()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java
index b5fa9e3..4bfd0b7 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java
@@ -28,6 +28,7 @@ import org.apache.reef.wake.remote.transport.LinkListener;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -94,14 +95,19 @@ class NSConnection<T> implements Connection<T> {
}
/**
- * Writes an object to the connection.
+ * Writes a message to the connection.
*
- * @param obj an object of type T
+ * @param message a message of type T
* @throws a network exception
*/
@Override
- public void write(final T obj) throws NetworkException {
- this.link.write(new NSMessage<T>(this.srcId, this.destId, obj));
+ public void write(final T message) {
+ this.link.write(new NSMessage<T>(this.srcId, this.destId, message));
+ }
+
+ @Override
+ public void write(List<T> messages) {
+ this.link.write(new NSMessage<T>(this.srcId, this.destId, messages));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnection.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnection.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnection.java
new file mode 100644
index 0000000..d2175e7
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnection.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.Connection;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.remote.transport.Link;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+final class NetworkConnection<T> implements Connection<T> {
+
+ private Link<NetworkConnectionServiceMessage<T>> link;
+
+ private final Identifier destId;
+ private final AtomicBoolean closed;
+ private final NetworkConnectionFactory connFactory;
+
+ /**
+ * Constructs a connection for destination identifier of NetworkConnectionService.
+ * @param connFactory a connection factory of this connection.
+ * @param destId a destination identifier of NetworkConnectionService.
+ */
+ NetworkConnection(
+ final NetworkConnectionFactory connFactory,
+ final Identifier destId) {
+ this.connFactory = connFactory;
+ this.destId = destId;
+ this.closed = new AtomicBoolean();
+ }
+
+ @Override
+ public void open() throws NetworkException {
+ link = connFactory.openLink(destId);
+ }
+
+ @Override
+ public void write(final List<T> messageList) {
+ final NetworkConnectionServiceMessage<T> nsMessage = new NetworkConnectionServiceMessage<>(
+ connFactory.getConnectionFactoryId(),
+ connFactory.getSrcId(),
+ destId,
+ messageList);
+ link.write(nsMessage);
+ }
+
+ @Override
+ public void write(final T message) {
+ final List<T> messageList = new ArrayList<>(1);
+ messageList.add(message);
+ write(messageList);
+ }
+
+ @Override
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ connFactory.removeConnection(this.destId);
+ link = null;
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Connection from")
+ .append(connFactory.getSrcId())
+ .append(":")
+ .append(connFactory.getConnectionFactoryId())
+ .append(" to ")
+ .append(destId)
+ .append(":")
+ .append(connFactory.getConnectionFactoryId());
+ return sb.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java
new file mode 100644
index 0000000..d49988d
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.Connection;
+import org.apache.reef.io.network.ConnectionFactory;
+import org.apache.reef.io.network.Message;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.transport.Link;
+import org.apache.reef.wake.remote.transport.LinkListener;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A connection factory which is created by NetworkConnectionService.
+ */
+final class NetworkConnectionFactory<T> implements ConnectionFactory<T> {
+
+ private final ConcurrentMap<Identifier, Connection<T>> connectionMap;
+ private final String connFactoryId;
+ private final Codec<T> eventCodec;
+ private final EventHandler<Message<T>> eventHandler;
+ private final LinkListener<Message<T>> eventListener;
+ private final NetworkConnectionServiceImpl networkService;
+
+ NetworkConnectionFactory(
+ final NetworkConnectionServiceImpl networkService,
+ final String connFactoryId,
+ final Codec<T> eventCodec,
+ final EventHandler<Message<T>> eventHandler,
+ final LinkListener<Message<T>> eventListener) {
+ this.networkService = networkService;
+ this.connectionMap = new ConcurrentHashMap<>();
+ this.connFactoryId = connFactoryId;
+ this.eventCodec = eventCodec;
+ this.eventHandler = eventHandler;
+ this.eventListener = eventListener;
+ }
+
+ /**
+ * Creates a new connection.
+ * @param destId a destination identifier of NetworkConnectionService.
+ */
+ @Override
+ public Connection<T> newConnection(final Identifier destId) {
+ final Connection<T> connection = connectionMap.get(destId);
+
+ if (connection == null) {
+ final Connection<T> newConnection = new NetworkConnection<>(this, destId);
+ connectionMap.putIfAbsent(destId, newConnection);
+ return connectionMap.get(destId);
+ }
+ return connection;
+ }
+
+ <T> Link<NetworkConnectionServiceMessage<T>> openLink(final Identifier remoteId) throws NetworkException {
+ return networkService.openLink(remoteId);
+ }
+
+ String getConnectionFactoryId() {
+ return this.connFactoryId;
+ }
+
+ Identifier getSrcId() {
+ return this.networkService.getNetworkConnectionServiceId();
+ }
+
+ EventHandler<Message<T>> getEventHandler() {
+ return eventHandler;
+ }
+
+ LinkListener<Message<T>> getLinkListener() {
+ return eventListener;
+ }
+
+ public void removeConnection(final Identifier remoteId) {
+ connectionMap.remove(remoteId);
+ }
+
+ Codec<T> getCodec() {
+ return eventCodec;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceExceptionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceExceptionHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceExceptionHandler.java
new file mode 100644
index 0000000..2e4bd43
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceExceptionHandler.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default exception handler.
+ */
+public final class NetworkConnectionServiceExceptionHandler implements EventHandler<Exception> {
+
+ private static final Logger LOG = Logger.getLogger(NetworkConnectionServiceExceptionHandler.class.getName());
+
+ @Inject
+ public NetworkConnectionServiceExceptionHandler() {
+ }
+
+ @Override
+ public void onNext(final Exception value) {
+ LOG.log(Level.WARNING, "An exception occurred in transport of NetworkConnectionService: {0}", value);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java
new file mode 100644
index 0000000..f5e1f83
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.Tuple;
+import org.apache.reef.io.network.ConnectionFactory;
+import org.apache.reef.io.network.Message;
+import org.apache.reef.io.network.NetworkConnectionService;
+import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory;
+import org.apache.reef.io.network.impl.config.NetworkConnectionServicePort;
+import org.apache.reef.io.network.naming.NameResolver;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.impl.SingleThreadStage;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Link;
+import org.apache.reef.wake.remote.transport.LinkListener;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.TransportFactory;
+
+import javax.inject.Inject;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default Network connection service implementation.
+ */
+public final class NetworkConnectionServiceImpl implements NetworkConnectionService {
+
+ private static final Logger LOG = Logger.getLogger(NetworkConnectionServiceImpl.class.getName());
+
+ /**
+ * An identifier factory registering network connection service id.
+ */
+ private final IdentifierFactory idFactory;
+ /**
+ * A name resolver looking up nameserver.
+ */
+ private final NameResolver nameResolver;
+ /**
+ * A messaging transport.
+ */
+ private final Transport transport;
+ /**
+ * A map of (id of connection factory, a connection factory instance).
+ */
+ private final ConcurrentMap<String, NetworkConnectionFactory> connFactoryMap;
+ /**
+ * A network connection service identifier.
+ */
+ private Identifier myId;
+ /**
+ * A network connection service message codec.
+ */
+ private final Codec<NetworkConnectionServiceMessage> nsCodec;
+ /**
+ * A network connection service link listener.
+ */
+ private final LinkListener<NetworkConnectionServiceMessage> nsLinkListener;
+ /**
+ * A stage registering identifiers to nameServer.
+ */
+ private final EStage<Tuple<Identifier, InetSocketAddress>> nameServiceRegisteringStage;
+ /**
+ * A stage unregistering identifiers from nameServer.
+ */
+ private final EStage<Identifier> nameServiceUnregisteringStage;
+
+ @Inject
+ private NetworkConnectionServiceImpl(
+ @Parameter(NetworkConnectionServiceIdFactory.class) final IdentifierFactory idFactory,
+ @Parameter(NetworkConnectionServicePort.class) final int nsPort,
+ final TransportFactory transportFactory,
+ final NameResolver nameResolver) {
+ this.idFactory = idFactory;
+ this.connFactoryMap = new ConcurrentHashMap<>();
+ this.nsCodec = new NetworkConnectionServiceMessageCodec(idFactory, connFactoryMap);
+ this.nsLinkListener = new NetworkConnectionServiceLinkListener(connFactoryMap);
+ final EventHandler<TransportEvent> recvHandler =
+ new NetworkConnectionServiceReceiveHandler(connFactoryMap, nsCodec);
+ this.nameResolver = nameResolver;
+ this.transport = transportFactory.newInstance(nsPort, recvHandler, recvHandler,
+ new NetworkConnectionServiceExceptionHandler());
+
+ this.nameServiceRegisteringStage = new SingleThreadStage<>(
+ "NameServiceRegisterer", new EventHandler<Tuple<Identifier, InetSocketAddress>>() {
+ @Override
+ public void onNext(final Tuple<Identifier, InetSocketAddress> tuple) {
+ try {
+ nameResolver.register(tuple.getKey(), tuple.getValue());
+ LOG.log(Level.FINEST, "Registered {0} with nameservice", tuple.getKey());
+ } catch (final Exception ex) {
+ final String msg = "Unable to register " + tuple.getKey() + " with name service";
+ LOG.log(Level.WARNING, msg, ex);
+ throw new RuntimeException(msg, ex);
+ }
+ }
+ }, 5);
+
+ this.nameServiceUnregisteringStage = new SingleThreadStage<>(
+ "NameServiceRegisterer", new EventHandler<Identifier>() {
+ @Override
+ public void onNext(final Identifier id) {
+ try {
+ nameResolver.unregister(id);
+ LOG.log(Level.FINEST, "Unregistered {0} with nameservice", id);
+ } catch (final Exception ex) {
+ final String msg = "Unable to unregister " + id + " with name service";
+ LOG.log(Level.WARNING, msg, ex);
+ throw new RuntimeException(msg, ex);
+ }
+ }
+ }, 5);
+ }
+
+ @Override
+ public <T> void registerConnectionFactory(final Identifier connFactoryId,
+ final Codec<T> codec,
+ final EventHandler<Message<T>> eventHandler,
+ final LinkListener<Message<T>> linkListener) throws NetworkException {
+ String id = connFactoryId.toString();
+ if (connFactoryMap.get(id) != null) {
+ throw new NetworkException("ConnectionFactory " + connFactoryId + " was already registered.");
+ }
+ final ConnectionFactory connFactory = connFactoryMap.putIfAbsent(id,
+ new NetworkConnectionFactory<>(this, id, codec, eventHandler, linkListener));
+
+ if (connFactory != null) {
+ throw new NetworkException("ConnectionFactory " + connFactoryId + " was already registered.");
+ }
+ }
+
+ @Override
+ public void unregisterConnectionFactory(final Identifier connFactoryId) {
+ final String id = connFactoryId.toString();
+ final ConnectionFactory connFactory = connFactoryMap.get(id);
+ if (connFactory != null) {
+ final ConnectionFactory cf = connFactoryMap.remove(id);
+ if (cf == null) {
+ LOG.log(Level.WARNING, "ConnectionFactory of {0} is null", id);
+ }
+ } else {
+ LOG.log(Level.WARNING, "ConnectionFactory of {0} is null", id);
+ }
+ }
+
+ /**
+ * Registers a source identifier of NetworkConnectionService.
+ * @param ncsId
+ * @throws Exception
+ */
+ @Override
+ public void registerId(final Identifier ncsId) {
+ LOG.log(Level.INFO, "Registering NetworkConnectionService " + ncsId);
+ this.myId = ncsId;
+ final Tuple<Identifier, InetSocketAddress> tuple =
+ new Tuple<>(ncsId, (InetSocketAddress) this.transport.getLocalAddress());
+ LOG.log(Level.FINEST, "Binding {0} to NetworkConnectionService@({1})",
+ new Object[]{tuple.getKey(), tuple.getValue()});
+ this.nameServiceRegisteringStage.onNext(tuple);
+ }
+
+ /**
+ * Open a channel for destination identifier of NetworkConnectionService.
+ * @param destId
+ * @throws NetworkException
+ */
+ <T> Link<NetworkConnectionServiceMessage<T>> openLink(final Identifier destId) throws NetworkException {
+ try {
+ final SocketAddress address = nameResolver.lookup(destId);
+ if (address == null) {
+ throw new NetworkException("Lookup " + destId + " is null");
+ }
+ return transport.open(address, nsCodec, nsLinkListener);
+ } catch(Exception e) {
+ e.printStackTrace();
+ throw new NetworkException(e);
+ }
+ }
+
+ /**
+ * Gets a ConnectionFactory.
+ * @param connFactoryId the identifier of the ConnectionFactory
+ */
+ @Override
+ public <T> ConnectionFactory<T> getConnectionFactory(final Identifier connFactoryId) {
+ final ConnectionFactory<T> connFactory = connFactoryMap.get(connFactoryId.toString());
+ if (connFactory == null) {
+ throw new RuntimeException("Cannot find ConnectionFactory of " + connFactoryId + ".");
+ }
+ return connFactory;
+ }
+
+ @Override
+ public void unregisterId(final Identifier ncsId) {
+ LOG.log(Level.FINEST, "Unbinding {0} to NetworkConnectionService@({1})",
+ new Object[]{ncsId, this.transport.getLocalAddress()});
+ this.myId = null;
+ this.nameServiceUnregisteringStage.onNext(ncsId);
+ }
+
+ @Override
+ public Identifier getNetworkConnectionServiceId() {
+ return this.myId;
+ }
+
+ @Override
+ public void close() throws Exception {
+ LOG.log(Level.FINE, "Shutting down");
+ this.nameServiceRegisteringStage.close();
+ this.nameServiceUnregisteringStage.close();
+ this.transport.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceLinkListener.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceLinkListener.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceLinkListener.java
new file mode 100644
index 0000000..2c9528c
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceLinkListener.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.wake.remote.transport.LinkListener;
+
+import java.net.SocketAddress;
+import java.util.Map;
+
+final class NetworkConnectionServiceLinkListener implements LinkListener<NetworkConnectionServiceMessage> {
+
+ private final Map<String, NetworkConnectionFactory> connFactoryMap;
+
+ NetworkConnectionServiceLinkListener(
+ final Map<String, NetworkConnectionFactory> connFactoryMap) {
+ this.connFactoryMap = connFactoryMap;
+ }
+
+ @Override
+ public void onSuccess(final NetworkConnectionServiceMessage message) {
+ final LinkListener listener = connFactoryMap.get(message.getConnectionFactoryId()).getLinkListener();
+ if (listener != null) {
+ listener.onSuccess(message);
+ }
+
+ }
+
+ @Override
+ public void onException(final Throwable cause, final SocketAddress remoteAddress,
+ final NetworkConnectionServiceMessage message) {
+ final LinkListener listener = connFactoryMap.get(message.getConnectionFactoryId()).getLinkListener();
+ if (listener != null) {
+ listener.onException(cause, remoteAddress, message);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java
new file mode 100644
index 0000000..9ce10c1
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.io.network.Message;
+import org.apache.reef.wake.Identifier;
+
+import java.net.SocketAddress;
+import java.util.List;
+
+
+/**
+ * NetworkConnectionServiceMessage implementation.
+ * This is a wrapper message of message type <T>.
+ */
+final class NetworkConnectionServiceMessage<T> implements Message<T> {
+
+ private final List<T> messages;
+ private SocketAddress remoteAddr;
+ private final String connFactoryId;
+ private final Identifier srcId;
+ private final Identifier destId;
+
+ /**
+ * Constructs a network connection service message.
+ *
+ * @param connFactoryId the connection factory identifier
+ * @param srcId the source identifier of NetworkConnectionService
+ * @param destId the destination identifier of NetworkConnectionService
+ * @param messages the list of messages
+ */
+ public NetworkConnectionServiceMessage(
+ final String connFactoryId,
+ final Identifier srcId,
+ final Identifier destId,
+ final List<T> messages) {
+ this.connFactoryId = connFactoryId;
+ this.srcId = srcId;
+ this.destId = destId;
+ this.messages = messages;
+ }
+
+ void setRemoteAddress(final SocketAddress remoteAddress) {
+ this.remoteAddr = remoteAddress;
+ }
+
+ /**
+ * Gets a destination identifier.
+ *
+ * @return a remote id
+ */
+ @Override
+ public Identifier getDestId() {
+ return destId;
+ }
+
+ /**
+ * Gets a connection factory identifier.
+ *
+ * @return a connection factory id
+ */
+ public String getConnectionFactoryId() {
+ return connFactoryId;
+ }
+
+
+ /**
+ * Gets a source identifier of NetworkConnectionService.
+ *
+ * @return a source id
+ */
+ @Override
+ public Identifier getSrcId() {
+ return srcId;
+ }
+
+ @Override
+ public List<T> getData() {
+ return messages;
+ }
+
+ /**
+ * Returns a string representation of this object.
+ *
+ * @return a string representation of this object
+ */
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("NSMessage");
+ builder.append(" remoteID=");
+ builder.append(destId);
+ builder.append(" message=[| ");
+ for (T message : messages) {
+ builder.append(message + " |");
+ }
+ builder.append("]");
+ return builder.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessageCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessageCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessageCodec.java
new file mode 100644
index 0000000..bdccff2
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessageCodec.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * DefaultNetworkMessageCodec implementation.
+ * This codec encodes/decodes NetworkConnectionServiceMessageImpl according to the type <T>.
+ */
+final class NetworkConnectionServiceMessageCodec implements Codec<NetworkConnectionServiceMessage> {
+
+ private final IdentifierFactory factory;
+ /**
+ * Contains entries of (id of connection factory, instance of connection factory).
+ */
+ private final Map<String, NetworkConnectionFactory> connFactoryMap;
+ /**
+ * Contains entries of (instance of codec, boolean whether the codec is streaming or not).
+ */
+ private final ConcurrentMap<Codec, Boolean> isStreamingCodecMap;
+
+ /**
+ * Constructs a network connection service message codec.
+ */
+ NetworkConnectionServiceMessageCodec(
+ final IdentifierFactory factory,
+ final Map<String, NetworkConnectionFactory> connFactoryMap) {
+ this.factory = factory;
+ this.connFactoryMap = connFactoryMap;
+ this.isStreamingCodecMap = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Encodes a network connection service message to bytes.
+ * @param obj a message
+ * @return bytes
+ */
+ @Override
+ public byte[] encode(final NetworkConnectionServiceMessage obj) {
+ final Codec codec = connFactoryMap.get(obj.getConnectionFactoryId()).getCodec();
+ Boolean isStreamingCodec = isStreamingCodecMap.get(codec);
+ if (isStreamingCodec == null) {
+ isStreamingCodec = codec instanceof StreamingCodec;
+ isStreamingCodecMap.putIfAbsent(codec, isStreamingCodec);
+ }
+
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ try (final DataOutputStream daos = new DataOutputStream(baos)) {
+ daos.writeUTF(obj.getConnectionFactoryId());
+ daos.writeUTF(obj.getSrcId().toString());
+ daos.writeUTF(obj.getDestId().toString());
+ daos.writeInt(obj.getData().size());
+
+ if (isStreamingCodec) {
+ for (final Object rec : obj.getData()) {
+ ((StreamingCodec) codec).encodeToStream(rec, daos);
+ }
+ } else {
+ final Iterable dataList = obj.getData();
+ for (final Object message : dataList) {
+ final byte[] bytes = codec.encode(message);
+ daos.writeInt(bytes.length);
+ daos.write(bytes);
+ }
+ }
+ return baos.toByteArray();
+ }
+ } catch (final IOException e) {
+ throw new RuntimeException("IOException", e);
+ }
+ }
+
+ /**
+ * Decodes a network connection service message from bytes.
+ *
+ * @param data bytes
+ * @return a message
+ */
+ @Override
+ public NetworkConnectionServiceMessage decode(final byte[] data) {
+ try (final ByteArrayInputStream bais = new ByteArrayInputStream(data)) {
+ try (final DataInputStream dais = new DataInputStream(bais)) {
+ final String connFactoryId = dais.readUTF();
+ final Identifier srcId = factory.getNewInstance(dais.readUTF());
+ final Identifier destId = factory.getNewInstance(dais.readUTF());
+ final int size = dais.readInt();
+ final List list = new ArrayList(size);
+ final Codec codec = connFactoryMap.get(connFactoryId).getCodec();
+ Boolean isStreamingCodec = isStreamingCodecMap.get(codec);
+ if (isStreamingCodec == null) {
+ isStreamingCodec = codec instanceof StreamingCodec;
+ isStreamingCodecMap.putIfAbsent(codec, isStreamingCodec);
+ }
+
+ if (isStreamingCodec) {
+ for (int i = 0; i < size; i++) {
+ list.add(((StreamingCodec) codec).decodeFromStream(dais));
+ }
+ } else {
+ for (int i = 0; i < size; i++) {
+ final int byteSize = dais.readInt();
+ final byte[] bytes = new byte[byteSize];
+ dais.read(bytes);
+ list.add(codec.decode(bytes));
+ }
+ }
+
+ return new NetworkConnectionServiceMessage(
+ connFactoryId,
+ srcId,
+ destId,
+ list
+ );
+ }
+ } catch (final IOException e) {
+ throw new RuntimeException("IOException", e);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceReceiveHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceReceiveHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceReceiveHandler.java
new file mode 100644
index 0000000..1875e79
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceReceiveHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+
+import java.util.Map;
+
+/**
+ * NetworkConnectionService event handler.
+ * It dispatches events to the corresponding eventHandler.
+ */
+final class NetworkConnectionServiceReceiveHandler implements EventHandler<TransportEvent> {
+
+ private final Map<String, NetworkConnectionFactory> connFactoryMap;
+ private final Codec<NetworkConnectionServiceMessage> codec;
+
+ NetworkConnectionServiceReceiveHandler(
+ final Map<String, NetworkConnectionFactory> connFactoryMap,
+ final Codec<NetworkConnectionServiceMessage> codec) {
+ this.connFactoryMap = connFactoryMap;
+ this.codec = codec;
+ }
+
+ @Override
+ public void onNext(final TransportEvent transportEvent) {
+ final NetworkConnectionServiceMessage nsMessage = codec.decode(transportEvent.getData());
+ nsMessage.setRemoteAddress(transportEvent.getRemoteAddress());
+ final NetworkConnectionFactory connFactory = connFactoryMap.get(nsMessage.getConnectionFactoryId());
+ final EventHandler eventHandler = connFactory.getEventHandler();
+ eventHandler.onNext(nsMessage);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
index c79f88a..a2f5f2a 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
@@ -39,7 +39,7 @@ public class NetworkServiceParameters {
public static class NetworkServiceIdentifierFactory implements Name<IdentifierFactory> {
}
- @NamedParameter(doc = "port for the network service", short_name = "nsport", default_value = "7070")
+ @NamedParameter(doc = "port for the network service", short_name = "nsport", default_value = "0")
public static class NetworkServicePort implements Name<Integer> {
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java
new file mode 100644
index 0000000..540fe80
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.io.network.NetworkConnectionService;
+import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.events.TaskStop;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.IdentifierFactory;
+
+import javax.inject.Inject;
+/**
+ * TaskStop event handler for unregistering NetworkConnectionService.
+ * Users have to bind this handler into ServiceConfiguration.ON_TASK_STOP.
+ */
+public final class UnbindNetworkConnectionServiceFromTask implements EventHandler<TaskStop> {
+
+ private final NetworkConnectionService ncs;
+ private final IdentifierFactory idFac;
+
+ @Inject
+ public UnbindNetworkConnectionServiceFromTask(
+ final NetworkConnectionService ncs,
+ @Parameter(NetworkConnectionServiceIdFactory.class) final IdentifierFactory idFac) {
+ this.ncs = ncs;
+ this.idFac = idFac;
+ }
+
+ @Override
+ public void onNext(final TaskStop task) {
+ this.ncs.unregisterId(this.idFac.getNewInstance(task.getId()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServiceIdFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServiceIdFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServiceIdFactory.java
new file mode 100644
index 0000000..67c3e96
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServiceIdFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl.config;
+
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.IdentifierFactory;
+
+@NamedParameter(doc = "identifier factory for the service", short_name = "ncsfactory",
+ default_class = StringIdentifierFactory.class)
+public final class NetworkConnectionServiceIdFactory implements Name<IdentifierFactory> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServicePort.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServicePort.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServicePort.java
new file mode 100644
index 0000000..abaf840
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServicePort.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl.config;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "port for the network connection service", short_name = "ncsport", default_value = "0")
+public final class NetworkConnectionServicePort implements Name<Integer> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/package-info.java
new file mode 100644
index 0000000..e54151a
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * TODO: Document.
+ */
+package org.apache.reef.io.network.impl.config;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java
new file mode 100644
index 0000000..ddf4bfd
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.Connection;
+import org.apache.reef.io.network.util.StringCodec;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.services.network.util.*;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.util.concurrent.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default Network connection service test.
+ */
+public class NetworkConnectionServiceTest {
+ private static final Logger LOG = Logger.getLogger(NetworkConnectionServiceTest.class.getName());
+
+ private final LocalAddressProvider localAddressProvider;
+ private final String localAddress;
+ private final Identifier groupCommClientId;
+ private final Identifier shuffleClientId;
+
+ public NetworkConnectionServiceTest() throws InjectionException {
+ localAddressProvider = LocalAddressProviderFactory.getInstance();
+ localAddress = localAddressProvider.getLocalAddress();
+
+ final IdentifierFactory idFac = new StringIdentifierFactory();
+ this.groupCommClientId = idFac.getNewInstance("groupComm");
+ this.shuffleClientId = idFac.getNewInstance("shuffle");
+ }
+
+ @Rule
+ public TestName name = new TestName();
+
+ private void runMessagingNetworkConnectionService(Codec<String> codec) throws Exception {
+ final int numMessages = 2000;
+ final Monitor monitor = new Monitor();
+ final NetworkMessagingTest messagingTest = new NetworkMessagingTest(localAddress);
+ messagingTest.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec);
+
+ final Connection<String> conn = messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
+ try {
+ conn.open();
+ for (int count = 0; count < numMessages; ++count) {
+ // send messages to the receiver.
+ conn.write("hello" + count);
+ }
+ monitor.mwait();
+ } catch (NetworkException e) {
+ e.printStackTrace();
+ }
+
+ conn.close();
+ messagingTest.close();
+ }
+
+ /**
+ * NetworkConnectionService messaging test.
+ */
+ @Test
+ public void testMessagingNetworkConnectionService() throws Exception {
+ LOG.log(Level.FINEST, name.getMethodName());
+ runMessagingNetworkConnectionService(new StringCodec());
+ }
+
+ /**
+ * NetworkConnectionService streaming messaging test.
+ */
+ @Test
+ public void testStreamingMessagingNetworkConnectionService() throws Exception {
+ LOG.log(Level.FINEST, name.getMethodName());
+ runMessagingNetworkConnectionService(new StreamingStringCodec());
+ }
+
+ public void runNetworkConnServiceWithMultipleConnFactories(Codec<String> stringCodec, Codec<Integer> integerCodec) throws Exception {
+ final ExecutorService executor = Executors.newFixedThreadPool(5);
+
+ final int groupcommMessages = 1000;
+ final Monitor monitor = new Monitor();
+ final NetworkMessagingTest messagingTest = new NetworkMessagingTest(localAddress);
+
+ messagingTest.registerTestConnectionFactory(groupCommClientId, groupcommMessages, monitor, stringCodec);
+
+ final int shuffleMessges = 2000;
+ final Monitor monitor2 = new Monitor();
+ messagingTest.registerTestConnectionFactory(shuffleClientId, shuffleMessges, monitor2, integerCodec);
+
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ final Connection<String> conn = messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
+ try {
+ conn.open();
+ for (int count = 0; count < groupcommMessages; ++count) {
+ // send messages to the receiver.
+ conn.write("hello" + count);
+ }
+ monitor.mwait();
+ conn.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ final Connection<Integer> conn = messagingTest.getConnectionFromSenderToReceiver(shuffleClientId);
+ try {
+ conn.open();
+ for (int count = 0; count < shuffleMessges; ++count) {
+ // send messages to the receiver.
+ conn.write(count);
+ }
+ monitor2.mwait();
+ conn.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ monitor.mwait();
+ monitor2.mwait();
+ executor.shutdown();
+ messagingTest.close();
+ }
+
+ /**
+ * Test NetworkService registering multiple connection factories.
+ */
+ @Test
+ public void testMultipleConnectionFactoriesTest() throws Exception {
+ LOG.log(Level.FINEST, name.getMethodName());
+ runNetworkConnServiceWithMultipleConnFactories(new StringCodec(), new ObjectSerializableCodec<Integer>());
+ }
+
+ /**
+ * Test NetworkService registering multiple connection factories with Streamingcodec.
+ */
+ @Test
+ public void testMultipleConnectionFactoriesStreamingTest() throws Exception {
+ LOG.log(Level.FINEST, name.getMethodName());
+ runNetworkConnServiceWithMultipleConnFactories(new StreamingStringCodec(), new StreamingIntegerCodec());
+ }
+
+ /**
+ * NetworkService messaging rate benchmark.
+ */
+ @Test
+ public void testMessagingNetworkConnServiceRate() throws Exception {
+ LOG.log(Level.FINEST, name.getMethodName());
+ final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
+
+ for (int size : messageSizes) {
+ final int numMessages = 300000 / (Math.max(1, size / 512));
+ final Monitor monitor = new Monitor();
+ final Codec<String> codec = new StringCodec();
+ final NetworkMessagingTest messagingTest = new NetworkMessagingTest(localAddress);
+ messagingTest.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec);
+ final Connection<String> conn = messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
+
+ // build the message
+ final StringBuilder msb = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ msb.append("1");
+ }
+ final String message = msb.toString();
+
+ long start = System.currentTimeMillis();
+ try {
+ conn.open();
+ for (int count = 0; count < numMessages; ++count) {
+ // send messages to the receiver.
+ conn.write(message);
+ }
+ monitor.mwait();
+ } catch (NetworkException e) {
+ e.printStackTrace();
+ }
+ final long end = System.currentTimeMillis();
+
+ final double runtime = ((double) end - start) / 1000;
+ LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numMessages / runtime + " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / runtime);// x2 for unicode chars
+ messagingTest.close();
+ }
+ }
+
+ /**
+ * NetworkService messaging rate benchmark.
+ */
+ @Test
+ public void testMessagingNetworkConnServiceRateDisjoint() throws Exception {
+ LOG.log(Level.FINEST, name.getMethodName());
+ final BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>();
+
+ final int numThreads = 4;
+ final int size = 2000;
+ final int numMessages = 300000 / (Math.max(1, size / 512));
+ final int totalNumMessages = numMessages * numThreads;
+
+ final ExecutorService e = Executors.newCachedThreadPool();
+ for (int t = 0; t < numThreads; t++) {
+ final int tt = t;
+
+ e.submit(new Runnable() {
+ public void run() {
+ try {
+ final Monitor monitor = new Monitor();
+ final Codec<String> codec = new StringCodec();
+ final NetworkMessagingTest messagingTest = new NetworkMessagingTest(localAddress);
+ messagingTest.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec);
+ final Connection<String> conn = messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
+
+ // build the message
+ final StringBuilder msb = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ msb.append("1");
+ }
+ final String message = msb.toString();
+
+ try {
+ conn.open();
+ for (int count = 0; count < numMessages; ++count) {
+ // send messages to the receiver.
+ conn.write(message);
+ }
+ monitor.mwait();
+ } catch (NetworkException e) {
+ e.printStackTrace();
+ }
+ messagingTest.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
+ // start and time
+ final long start = System.currentTimeMillis();
+ final Object ignore = new Object();
+ for (int i = 0; i < numThreads; i++) barrier.add(ignore);
+ e.shutdown();
+ e.awaitTermination(100, TimeUnit.SECONDS);
+ final long end = System.currentTimeMillis();
+ final double runtime = ((double) end - start) / 1000;
+ LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
+ }
+
+ @Test
+ public void testMultithreadedSharedConnMessagingNetworkConnServiceRate() throws Exception {
+ LOG.log(Level.FINEST, name.getMethodName());
+ final int[] messageSizes = {2000};// {1,16,32,64,512,64*1024,1024*1024};
+
+ for (int size : messageSizes) {
+ final int numMessages = 300000 / (Math.max(1, size / 512));
+ final int numThreads = 2;
+ final int totalNumMessages = numMessages * numThreads;
+ final Monitor monitor = new Monitor();
+ final Codec<String> codec = new StringCodec();
+ final NetworkMessagingTest messagingTest = new NetworkMessagingTest(localAddress);
+ messagingTest.registerTestConnectionFactory(groupCommClientId, totalNumMessages, monitor, codec);
+
+ final ExecutorService e = Executors.newCachedThreadPool();
+
+ // build the message
+ final StringBuilder msb = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ msb.append("1");
+ }
+ final String message = msb.toString();
+ final Connection<String> conn = messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
+
+ final long start = System.currentTimeMillis();
+ for (int i = 0; i < numThreads; i++) {
+ e.submit(new Runnable() {
+ @Override
+ public void run() {
+
+ try {
+ conn.open();
+ for (int count = 0; count < numMessages; ++count) {
+ // send messages to the receiver.
+ conn.write(message);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
+ e.shutdown();
+ e.awaitTermination(30, TimeUnit.SECONDS);
+ monitor.mwait();
+ final long end = System.currentTimeMillis();
+ final double runtime = ((double) end - start) / 1000;
+ LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
+ conn.close();
+ messagingTest.close();
+ }
+ }
+
+ /**
+ * NetworkService messaging rate benchmark.
+ */
+ @Test
+ public void testMessagingNetworkConnServiceBatchingRate() throws Exception {
+ LOG.log(Level.FINEST, name.getMethodName());
+
+ final int batchSize = 1024 * 1024;
+ final int[] messageSizes = {32, 64, 512};
+
+ for (int size : messageSizes) {
+ final int numMessages = 300 / (Math.max(1, size / 512));
+ final Monitor monitor = new Monitor();
+ final Codec<String> codec = new StringCodec();
+ final NetworkMessagingTest messagingTest = new NetworkMessagingTest(localAddress);
+ messagingTest.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec);
+ final Connection<String> conn = messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
+
+ // build the message
+ final StringBuilder msb = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ msb.append("1");
+ }
+ final String message = msb.toString();
+
+ final long start = System.currentTimeMillis();
+ try {
+ for (int i = 0; i < numMessages; i++) {
+ final StringBuilder sb = new StringBuilder();
+ for (int j = 0; j < batchSize / size; j++) {
+ sb.append(message);
+ }
+ conn.open();
+ conn.write(sb.toString());
+ }
+ monitor.mwait();
+ } catch (NetworkException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+
+ final long end = System.currentTimeMillis();
+ final double runtime = ((double) end - start) / 1000;
+ final long numAppMessages = numMessages * batchSize / size;
+ LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numAppMessages / runtime + " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / runtime);// x2 for unicode chars
+ messagingTest.close();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
index c1f76b8..2bb75ae 100644
--- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
@@ -387,12 +387,8 @@ public class NetworkServiceTest {
@Override
public void run() {
- try {
- for (int i = 0; i < numMessages; i++) {
- conn.write(message);
- }
- } catch (NetworkException e) {
- e.printStackTrace();
+ for (int i = 0; i < numMessages; i++) {
+ conn.write(message);
}
}
});
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTest.java
new file mode 100644
index 0000000..ba0cfe4
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network.util;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.Connection;
+import org.apache.reef.io.network.Message;
+import org.apache.reef.io.network.NetworkConnectionService;
+import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory;
+import org.apache.reef.io.network.naming.NameResolverConfiguration;
+import org.apache.reef.io.network.naming.NameServer;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.transport.LinkListener;
+
+import java.net.SocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Helper class for NetworkConnectionService test.
+ */
+public final class NetworkMessagingTest {
+ private static final Logger LOG = Logger.getLogger(NetworkMessagingTest.class.getName());
+
+ private final IdentifierFactory factory;
+ private final NetworkConnectionService receiverNetworkConnService;
+ private final NetworkConnectionService senderNetworkConnService;
+ private final String receiver;
+ private final String sender;
+ private final NameServer nameServer;
+
+ public NetworkMessagingTest(final String localAddress) throws InjectionException {
+ // name server
+ final Injector injector = Tang.Factory.getTang().newInjector();
+ this.nameServer = injector.getInstance(NameServer.class);
+ final Configuration netConf = NameResolverConfiguration.CONF
+ .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
+ .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServer.getPort())
+ .build();
+
+ LOG.log(Level.FINEST, "=== Test network connection service receiver start");
+ // network service for receiver
+ this.receiver = "receiver";
+ final Injector injectorReceiver = injector.forkInjector(netConf);
+ this.receiverNetworkConnService = injectorReceiver.getInstance(NetworkConnectionService.class);
+ this.factory = injectorReceiver.getNamedInstance(NetworkConnectionServiceIdFactory.class);
+ this.receiverNetworkConnService.registerId(this.factory.getNewInstance(receiver));
+
+ // network service for sender
+ this.sender = "sender";
+ LOG.log(Level.FINEST, "=== Test network connection service sender start");
+ final Injector injectorSender = injector.forkInjector(netConf);
+ senderNetworkConnService = injectorSender.getInstance(NetworkConnectionService.class);
+ senderNetworkConnService.registerId(this.factory.getNewInstance(this.sender));
+ }
+
+ public <T> void registerTestConnectionFactory(final Identifier connFactoryId,
+ final int numMessages, final Monitor monitor,
+ final Codec<T> codec) throws NetworkException {
+ receiverNetworkConnService.registerConnectionFactory(connFactoryId, codec, new MessageHandler<T>(monitor, numMessages), new TestListener<T>());
+ senderNetworkConnService.registerConnectionFactory(connFactoryId, codec, new MessageHandler<T>(monitor, numMessages), new TestListener<T>());
+ }
+
+ public <T> Connection<T> getConnectionFromSenderToReceiver(final Identifier connFactoryId) {
+ final Identifier destId = factory.getNewInstance(receiver);
+ return (Connection<T>)senderNetworkConnService.getConnectionFactory(connFactoryId).newConnection(destId);
+ }
+
+ public void close() throws Exception {
+ senderNetworkConnService.close();
+ receiverNetworkConnService.close();
+ nameServer.close();
+ }
+
+ public static final class MessageHandler<T> implements EventHandler<Message<T>> {
+ private final int expected;
+ private final Monitor monitor;
+ private AtomicInteger count = new AtomicInteger(0);
+
+ public MessageHandler(final Monitor monitor,
+ final int expected) {
+ this.monitor = monitor;
+ this.expected = expected;
+ }
+
+ @Override
+ public void onNext(Message<T> value) {
+ count.incrementAndGet();
+ LOG.log(Level.FINE, "Count: {0}", count.get());
+ LOG.log(Level.FINE,
+ "OUT: {0} received {1} from {2} to {3}",
+ new Object[]{value, value.getSrcId(), value.getDestId()});
+
+ for (final T obj : value.getData()) {
+ LOG.log(Level.FINE, "OUT: data: {0}", obj);
+ }
+
+ if (count.get() == expected) {
+ monitor.mnotify();
+ }
+ }
+ }
+
+ public static final class TestListener<T> implements LinkListener<Message<T>> {
+ @Override
+ public void onSuccess(Message<T> message) {
+ LOG.log(Level.FINE, "success: " + message);
+ }
+ @Override
+ public void onException(Throwable cause, SocketAddress remoteAddress, Message<T> message) {
+ LOG.log(Level.WARNING, "exception: " + cause + message);
+ throw new RuntimeException(cause);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingIntegerCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingIntegerCodec.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingIntegerCodec.java
new file mode 100644
index 0000000..eaec90f
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingIntegerCodec.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network.util;
+
+import org.apache.reef.io.network.impl.StreamingCodec;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+
+public class StreamingIntegerCodec implements StreamingCodec<Integer> {
+
+ @Override
+ public void encodeToStream(Integer obj, DataOutputStream stream) {
+ try {
+ stream.writeInt(obj);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Integer decodeFromStream(DataInputStream stream) {
+ try {
+ return stream.readInt();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Integer decode(byte[] data) {
+ return null;
+ }
+
+ @Override
+ public byte[] encode(Integer obj) {
+ return new byte[0];
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingStringCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingStringCodec.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingStringCodec.java
new file mode 100644
index 0000000..497c7fa
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingStringCodec.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network.util;
+
+import org.apache.reef.io.network.impl.StreamingCodec;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+
+public class StreamingStringCodec implements StreamingCodec<String> {
+ @Override
+ public byte[] encode(String obj) {
+ return obj.getBytes();
+ }
+
+ @Override
+ public String decode(byte[] buf) {
+ return new String(buf);
+ }
+
+ @Override
+ public void encodeToStream(String obj, DataOutputStream stream) {
+ try {
+ stream.writeUTF(obj);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String decodeFromStream(DataInputStream stream) {
+ try {
+ return stream.readUTF();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}