You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:00 UTC
[27/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSMessageCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSMessageCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSMessageCodec.java
new file mode 100644
index 0000000..f22f970
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSMessageCodec.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.reef.io.network.exception.NetworkRuntimeException;
+import org.apache.reef.io.network.proto.ReefNetworkServiceProtos.NSMessagePBuf;
+import org.apache.reef.io.network.proto.ReefNetworkServiceProtos.NSRecordPBuf;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Network service message codec
+ *
+ * @param <T> type
+ */
+public class NSMessageCodec<T> implements Codec<NSMessage<T>> {
+
+ private final Codec<T> codec;
+ private final IdentifierFactory factory;
+ private final boolean isStreamingCodec;
+
+ /**
+ * Constructs a network service message codec
+ *
+ * @param codec a codec
+ * @param factory an identifier factory
+ */
+ public NSMessageCodec(final Codec<T> codec, final IdentifierFactory factory) {
+ this.codec = codec;
+ this.factory = factory;
+ this.isStreamingCodec = codec instanceof StreamingCodec;
+ }
+
+ /**
+ * Encodes a network service message to bytes
+ *
+ * @param obj a message
+ * @return bytes
+ */
+ @Override
+ public byte[] encode(final NSMessage<T> obj) {
+ if (isStreamingCodec) {
+ final StreamingCodec<T> streamingCodec = (StreamingCodec<T>) codec;
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ try (DataOutputStream daos = new DataOutputStream(baos)) {
+ daos.writeUTF(obj.getSrcId().toString());
+ daos.writeUTF(obj.getDestId().toString());
+ daos.writeInt(obj.getData().size());
+ for (final T rec : obj.getData()) {
+ streamingCodec.encodeToStream(rec, daos);
+ }
+ }
+ return baos.toByteArray();
+ } catch (final IOException e) {
+ throw new RuntimeException("IOException", e);
+ }
+ } else {
+ final NSMessagePBuf.Builder pbuf = NSMessagePBuf.newBuilder();
+ pbuf.setSrcid(obj.getSrcId().toString());
+ pbuf.setDestid(obj.getDestId().toString());
+ for (final T rec : obj.getData()) {
+ final NSRecordPBuf.Builder rbuf = NSRecordPBuf.newBuilder();
+ rbuf.setData(ByteString.copyFrom(codec.encode(rec)));
+ pbuf.addMsgs(rbuf);
+ }
+ return pbuf.build().toByteArray();
+ }
+ }
+
+ /**
+ * Decodes a network service message from bytes
+ *
+ * @param buf bytes
+ * @return a message
+ */
+ @Override
+ public NSMessage<T> decode(final byte[] buf) {
+ if (isStreamingCodec) {
+ final StreamingCodec<T> streamingCodec = (StreamingCodec<T>) codec;
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(buf)) {
+ try (DataInputStream dais = new DataInputStream(bais)) {
+ final Identifier srcId = factory.getNewInstance(dais.readUTF());
+ final Identifier destId = factory.getNewInstance(dais.readUTF());
+ final int size = dais.readInt();
+ final List<T> list = new ArrayList<T>(size);
+ for (int i = 0; i < size; i++) {
+ list.add(streamingCodec.decodeFromStream(dais));
+ }
+ return new NSMessage<>(srcId, destId, list);
+ }
+ } catch (final IOException e) {
+ throw new RuntimeException("IOException", e);
+ }
+ } else {
+ NSMessagePBuf pbuf;
+ try {
+ pbuf = NSMessagePBuf.parseFrom(buf);
+ } catch (final InvalidProtocolBufferException e) {
+ e.printStackTrace();
+ throw new NetworkRuntimeException(e);
+ }
+ final List<T> list = new ArrayList<T>();
+ for (final NSRecordPBuf rbuf : pbuf.getMsgsList()) {
+ list.add(codec.decode(rbuf.getData().toByteArray()));
+ }
+ return new NSMessage<T>(factory.getNewInstance(pbuf.getSrcid()), factory.getNewInstance(pbuf.getDestid()), list);
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NameServiceCloseHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NameServiceCloseHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NameServiceCloseHandler.java
new file mode 100644
index 0000000..f569fcd
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NameServiceCloseHandler.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.evaluator.context.events.ContextStop;
+import org.apache.reef.io.network.naming.NameServer;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public final class NameServiceCloseHandler implements EventHandler<ContextStop> {
+
+ private static final Logger LOG = Logger.getLogger(NameServiceCloseHandler.class.getName());
+
+ private final AutoCloseable toClose;
+
+ @Inject
+ public NameServiceCloseHandler(final NameServer toClose) {
+ this.toClose = toClose;
+ }
+
+ @Override
+ public void onNext(final ContextStop event) {
+ try {
+ LOG.log(Level.FINEST, "Closing {0}", this.toClose);
+ this.toClose.close();
+ } catch (final Throwable ex) {
+ LOG.log(Level.SEVERE, "Exception while closing " + this.toClose, ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
new file mode 100644
index 0000000..6120193
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.io.Tuple;
+import org.apache.reef.io.naming.Naming;
+import org.apache.reef.io.network.Connection;
+import org.apache.reef.io.network.ConnectionFactory;
+import org.apache.reef.io.network.Message;
+import org.apache.reef.io.network.TransportFactory;
+import org.apache.reef.io.network.naming.NameCache;
+import org.apache.reef.io.network.naming.NameClient;
+import org.apache.reef.io.network.naming.NameLookupClient;
+import org.apache.reef.io.network.naming.NameServerParameters;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.*;
+import org.apache.reef.wake.impl.LoggingEventHandler;
+import org.apache.reef.wake.impl.SingleThreadStage;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.LinkListener;
+import org.apache.reef.wake.remote.transport.Transport;
+
+import javax.inject.Inject;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Network service for Task
+ */
+public final class NetworkService<T> implements Stage, ConnectionFactory<T> {
+
+ private static final Logger LOG = Logger.getLogger(NetworkService.class.getName());
+
+ private static final int retryCount;
+ private static final int retryTimeout;
+
+ static {
+ try {
+ final Injector injector = Tang.Factory.getTang().newInjector();
+ retryCount = injector.getNamedInstance(NameLookupClient.RetryCount.class);
+ retryTimeout = injector.getNamedInstance(NameLookupClient.RetryTimeout.class);
+ } catch (final InjectionException ex) {
+ final String msg = "Exception while trying to find default values for retryCount & Timeout";
+ LOG.log(Level.SEVERE, msg, ex);
+ throw new RuntimeException(msg, ex);
+ }
+ }
+
+ private final IdentifierFactory factory;
+ private final Codec<T> codec;
+ private final Transport transport;
+ private final NameClient nameClient;
+ private final ConcurrentMap<Identifier, Connection<T>> idToConnMap = new ConcurrentHashMap<>();
+ private final EStage<Tuple<Identifier, InetSocketAddress>> nameServiceRegisteringStage;
+ private final EStage<Identifier> nameServiceUnregisteringStage;
+ private Identifier myId;
+
+ public NetworkService(final IdentifierFactory factory,
+ final int nsPort,
+ final String nameServerAddr,
+ final int nameServerPort,
+ final Codec<T> codec,
+ final TransportFactory tpFactory,
+ final EventHandler<Message<T>> recvHandler,
+ final EventHandler<Exception> exHandler) {
+ this(factory, nsPort, nameServerAddr, nameServerPort,
+ retryCount, retryTimeout, codec, tpFactory, recvHandler, exHandler);
+ }
+
+ @Inject
+ public NetworkService(
+ final @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) IdentifierFactory factory,
+ final @Parameter(NetworkServiceParameters.NetworkServicePort.class) int nsPort,
+ final @Parameter(NameServerParameters.NameServerAddr.class) String nameServerAddr,
+ final @Parameter(NameServerParameters.NameServerPort.class) int nameServerPort,
+ final @Parameter(NameLookupClient.RetryCount.class) int retryCount,
+ final @Parameter(NameLookupClient.RetryTimeout.class) int retryTimeout,
+ final @Parameter(NetworkServiceParameters.NetworkServiceCodec.class) Codec<T> codec,
+ final @Parameter(NetworkServiceParameters.NetworkServiceTransportFactory.class) TransportFactory tpFactory,
+ final @Parameter(NetworkServiceParameters.NetworkServiceHandler.class) EventHandler<Message<T>> recvHandler,
+ final @Parameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class) EventHandler<Exception> exHandler) {
+
+ this.factory = factory;
+ this.codec = codec;
+ this.transport = tpFactory.create(nsPort,
+ new LoggingEventHandler<TransportEvent>(),
+ new MessageHandler<T>(recvHandler, codec, factory), exHandler);
+
+ this.nameClient = new NameClient(nameServerAddr, nameServerPort,
+ factory, retryCount, retryTimeout, new NameCache(30000));
+
+ this.nameServiceRegisteringStage = new SingleThreadStage<>(
+ "NameServiceRegisterer", new EventHandler<Tuple<Identifier, InetSocketAddress>>() {
+ @Override
+ public void onNext(final Tuple<Identifier, InetSocketAddress> tuple) {
+ try {
+ nameClient.register(tuple.getKey(), tuple.getValue());
+ LOG.log(Level.FINEST, "Registered {0} with nameservice", tuple.getKey());
+ } catch (final Exception ex) {
+ final String msg = "Unable to register " + tuple.getKey() + "with name service";
+ LOG.log(Level.WARNING, msg, ex);
+ throw new RuntimeException(msg, ex);
+ }
+ }
+ }, 5);
+
+ this.nameServiceUnregisteringStage = new SingleThreadStage<>(
+ "NameServiceRegisterer", new EventHandler<Identifier>() {
+ @Override
+ public void onNext(final Identifier id) {
+ try {
+ nameClient.unregister(id);
+ LOG.log(Level.FINEST, "Unregistered {0} with nameservice", id);
+ } catch (final Exception ex) {
+ final String msg = "Unable to unregister " + id + " with name service";
+ LOG.log(Level.WARNING, msg, ex);
+ throw new RuntimeException(msg, ex);
+ }
+ }
+ }, 5);
+ }
+
+ public void registerId(final Identifier id) {
+ this.myId = id;
+ final Tuple<Identifier, InetSocketAddress> tuple =
+ new Tuple<>(id, (InetSocketAddress) this.transport.getLocalAddress());
+ LOG.log(Level.FINEST, "Binding {0} to NetworkService@({1})",
+ new Object[]{tuple.getKey(), tuple.getValue()});
+ this.nameServiceRegisteringStage.onNext(tuple);
+ }
+
+ public void unregisterId(Identifier id) {
+ this.myId = null;
+ LOG.log(Level.FINEST, "Unbinding {0} to NetworkService@({1})",
+ new Object[]{id, this.transport.getLocalAddress()});
+ this.nameServiceUnregisteringStage.onNext(id);
+ }
+
+ public Identifier getMyId() {
+ return this.myId;
+ }
+
+ public Transport getTransport() {
+ return this.transport;
+ }
+
+ public Codec<T> getCodec() {
+ return this.codec;
+ }
+
+ public Naming getNameClient() {
+ return this.nameClient;
+ }
+
+ public IdentifierFactory getIdentifierFactory() {
+ return this.factory;
+ }
+
+ void remove(final Identifier id) {
+ this.idToConnMap.remove(id);
+ }
+
+ @Override
+ public void close() throws Exception {
+ LOG.log(Level.FINE, "Shutting down");
+ this.transport.close();
+ this.nameClient.close();
+ }
+
+ @Override
+ public Connection<T> newConnection(final Identifier destId) {
+
+ if (this.myId == null) {
+ throw new RuntimeException(
+ "Trying to establish a connection from a Network Service that is not bound to any task");
+ }
+
+ final Connection<T> conn = this.idToConnMap.get(destId);
+ if (conn != null) {
+ return conn;
+ }
+
+ final Connection<T> newConnection = new NSConnection<T>(
+ this.myId, destId, new LinkListener<T>() {
+ @Override
+ public void messageReceived(final Object message) {
+ }
+ }, this);
+
+ final Connection<T> existing = this.idToConnMap.putIfAbsent(destId, newConnection);
+ return existing == null ? newConnection : existing;
+ }
+}
+
+class MessageHandler<T> implements EventHandler<TransportEvent> {
+
+ private final EventHandler<Message<T>> handler;
+ private final NSMessageCodec<T> codec;
+
+ public MessageHandler(final EventHandler<Message<T>> handler,
+ final Codec<T> codec, final IdentifierFactory factory) {
+ this.handler = handler;
+ this.codec = new NSMessageCodec<T>(codec, factory);
+ }
+
+ @Override
+ public void onNext(final TransportEvent value) {
+ final byte[] data = value.getData();
+ final NSMessage<T> obj = this.codec.decode(data);
+ this.handler.onNext(obj);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceClosingHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceClosingHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceClosingHandler.java
new file mode 100644
index 0000000..6e17922
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceClosingHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.evaluator.context.events.ContextStop;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+public class NetworkServiceClosingHandler implements EventHandler<ContextStop> {
+ private final NetworkService<?> networkService;
+
+ @Inject
+ public NetworkServiceClosingHandler(final NetworkService<?> networkService) {
+ this.networkService = networkService;
+ }
+
+ @Override
+ public void onNext(ContextStop arg0) {
+ try {
+ networkService.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Exception while closing NetworkService", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
new file mode 100644
index 0000000..a31c5de
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.io.network.TransportFactory;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+
+public class NetworkServiceParameters {
+
+ @NamedParameter
+ public static class TaskId implements Name<String> {
+
+ }
+
+ @NamedParameter(doc = "identifier factory for the service", short_name = "factory", default_class = StringIdentifierFactory.class)
+ public static class NetworkServiceIdentifierFactory implements Name<IdentifierFactory> {
+ }
+
+ @NamedParameter(doc = "port for the network service", short_name = "nsport", default_value = "7070")
+ public static class NetworkServicePort implements Name<Integer> {
+ }
+
+ @NamedParameter(doc = "codec for the network service", short_name = "nscodec")
+ public static class NetworkServiceCodec implements Name<Codec<?>> {
+ }
+
+ @NamedParameter(doc = "transport factory for the network service", short_name = "nstransportfactory", default_class = MessagingTransportFactory.class)
+ public static class NetworkServiceTransportFactory implements Name<TransportFactory> {
+ }
+
+ @NamedParameter(doc = "network receive handler for the network service", short_name = "nshandler")
+ public static class NetworkServiceHandler implements Name<EventHandler<?>> {
+ }
+
+ @NamedParameter(doc = "network exception handler for the network service", short_name = "exhandler")
+ public static class NetworkServiceExceptionHandler implements Name<EventHandler<?>> {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/StreamingCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/StreamingCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/StreamingCodec.java
new file mode 100644
index 0000000..ac669af
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/StreamingCodec.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.wake.remote.Codec;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+/**
+ * A codec that can make serialization more efficient when an object has to be
+ * codec'ed through a chain of codecs
+ */
+public interface StreamingCodec<T> extends Codec<T> {
+
+ void encodeToStream(T obj, DataOutputStream stream);
+
+ T decodeFromStream(DataInputStream stream);
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNSFromTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNSFromTask.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNSFromTask.java
new file mode 100644
index 0000000..df337d9
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNSFromTask.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.events.TaskStop;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.IdentifierFactory;
+
+import javax.inject.Inject;
+
+public class UnbindNSFromTask implements EventHandler<TaskStop> {
+
+ private final NetworkService<?> ns;
+ private final IdentifierFactory idFac;
+
+ @Inject
+ public UnbindNSFromTask(
+ final NetworkService<?> ns,
+ final @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) IdentifierFactory idFac) {
+ this.ns = ns;
+ this.idFac = idFac;
+ }
+
+ @Override
+ public void onNext(final TaskStop task) {
+ this.ns.unregisterId(this.idFac.getNewInstance(task.getId()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/package-info.java
new file mode 100644
index 0000000..fbdc2ef
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameAssignmentTuple.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameAssignmentTuple.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameAssignmentTuple.java
new file mode 100644
index 0000000..c7792fd
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameAssignmentTuple.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.naming.NameAssignment;
+import org.apache.reef.wake.Identifier;
+
+import java.net.InetSocketAddress;
+
+/**
+ * An implementation of the NameAssignment interface
+ */
+public class NameAssignmentTuple implements NameAssignment {
+
+ private final Identifier id;
+ private final InetSocketAddress addr;
+
+ /**
+ * Constructs a name assignment tuple
+ *
+ * @param id an identifier
+ * @param addr an Internet socket address
+ */
+ public NameAssignmentTuple(Identifier id, InetSocketAddress addr) {
+ this.id = id;
+ this.addr = addr;
+ }
+
+ /**
+ * Gets an identifier
+ *
+ * @return an identifier
+ */
+ @Override
+ public Identifier getIdentifier() {
+ return id;
+ }
+
+ /**
+ * Gets an address
+ *
+ * @return an Internet socket address
+ */
+ @Override
+ public InetSocketAddress getAddress() {
+ return addr;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameCache.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameCache.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameCache.java
new file mode 100644
index 0000000..384bfe4
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameCache.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import com.google.common.cache.CacheBuilder;
+import org.apache.reef.io.network.Cache;
+import org.apache.reef.wake.Identifier;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Naming cache implementation
+ */
+public class NameCache implements Cache<Identifier, InetSocketAddress> {
+
+ private final com.google.common.cache.Cache<Identifier, InetSocketAddress> cache;
+
+ /**
+ * Constructs a naming cache
+ *
+ * @param timeout a cache entry timeout after access
+ */
+ public NameCache(long timeout) {
+ cache = CacheBuilder.newBuilder()
+ .expireAfterWrite(timeout, TimeUnit.MILLISECONDS)
+ .build();
+ }
+
+ /**
+ * Gets an address for an identifier
+ *
+ * @param key an identifier
+ * @param valueFetcher a callable to load a value for the corresponding identifier
+ * @return an Internet socket address
+ * @throws ExecutionException
+ */
+ @Override
+ public InetSocketAddress get(Identifier key,
+ Callable<InetSocketAddress> valueFetcher) throws ExecutionException {
+ return cache.get(key, valueFetcher);
+ }
+
+ /**
+ * Invalidates the entry for an identifier
+ *
+ * @param key an identifier
+ */
+ @Override
+ public void invalidate(Identifier key) {
+ cache.invalidate(key);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
new file mode 100644
index 0000000..79b4a92
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.naming.Naming;
+import org.apache.reef.io.network.Cache;
+import org.apache.reef.io.network.naming.exception.NamingRuntimeException;
+import org.apache.reef.io.network.naming.serialization.NamingLookupResponse;
+import org.apache.reef.io.network.naming.serialization.NamingMessage;
+import org.apache.reef.io.network.naming.serialization.NamingRegisterResponse;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.Stage;
+import org.apache.reef.wake.impl.SyncStage;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.NetUtils;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Naming client
+ */
+public class NameClient implements Stage, Naming {
+ private static final Logger LOG = Logger.getLogger(NameClient.class.getName());
+
+ private NameLookupClient lookupClient;
+ private NameRegistryClient registryClient;
+ private Transport transport;
+
+ /**
+ * Constructs a naming client
+ *
+ * @param serverAddr a server address
+ * @param serverPort a server port number
+ * @param factory an identifier factory
+ * @param cache a cache
+ */
+ public NameClient(String serverAddr, int serverPort,
+ IdentifierFactory factory, int retryCount, int retryTimeout,
+ Cache<Identifier, InetSocketAddress> cache) {
+ this(serverAddr, serverPort, 10000, factory, retryCount, retryTimeout, cache);
+ }
+
+ /**
+ * Constructs a naming client
+ *
+ * @param serverAddr a server address
+ * @param serverPort a server port number
+ * @param timeout timeout in ms
+ * @param factory an identifier factory
+ * @param cache a cache
+ */
+ public NameClient(final String serverAddr, final int serverPort, final long timeout,
+ final IdentifierFactory factory, final int retryCount, final int retryTimeout,
+ final Cache<Identifier, InetSocketAddress> cache) {
+
+ final BlockingQueue<NamingLookupResponse> replyLookupQueue = new LinkedBlockingQueue<NamingLookupResponse>();
+ final BlockingQueue<NamingRegisterResponse> replyRegisterQueue = new LinkedBlockingQueue<NamingRegisterResponse>();
+ final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
+
+ this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), 0,
+ new SyncStage<>(new NamingClientEventHandler(
+ new NamingResponseHandler(replyLookupQueue, replyRegisterQueue), codec)),
+ null, retryCount, retryTimeout);
+
+ this.lookupClient = new NameLookupClient(serverAddr, serverPort, timeout,
+ factory, retryCount, retryTimeout, replyLookupQueue, this.transport, cache);
+
+ this.registryClient = new NameRegistryClient(serverAddr, serverPort, timeout,
+ factory, replyRegisterQueue, this.transport);
+ }
+
+ /**
+ * Registers an (identifier, address) mapping
+ *
+ * @param id an identifier
+ * @param addr an Internet socket address
+ */
+ @Override
+ public void register(final Identifier id, final InetSocketAddress addr)
+ throws Exception {
+ LOG.log(Level.FINE, "Refister {0} : {1}", new Object[]{id, addr});
+ this.registryClient.register(id, addr);
+ }
+
+ /**
+ * Unregisters an identifier
+ *
+ * @param id an identifier
+ */
+ @Override
+ public void unregister(final Identifier id) throws IOException {
+ this.registryClient.unregister(id);
+ }
+
+ /**
+ * Finds an address for an identifier
+ *
+ * @param id an identifier
+ * @return an Internet socket address
+ */
+ @Override
+ public InetSocketAddress lookup(final Identifier id) throws Exception {
+ return this.lookupClient.lookup(id);
+ }
+
+ /**
+ * Retrieves an address for an identifier remotely
+ *
+ * @param id an identifier
+ * @return an Internet socket address
+ * @throws Exception
+ */
+ public InetSocketAddress remoteLookup(final Identifier id) throws Exception {
+ return this.lookupClient.remoteLookup(id);
+ }
+
+ /**
+ * Closes resources
+ */
+ @Override
+ public void close() throws Exception {
+
+ if (this.lookupClient != null) {
+ this.lookupClient.close();
+ }
+
+ if (this.registryClient != null) {
+ this.registryClient.close();
+ }
+
+ if (this.transport != null) {
+ this.transport.close();
+ }
+ }
+}
+
+/**
+ * Naming client transport event handler
+ */
+class NamingClientEventHandler implements EventHandler<TransportEvent> {
+
+ private static final Logger LOG = Logger.getLogger(NamingClientEventHandler.class.getName());
+
+ private final EventHandler<NamingMessage> handler;
+ private final Codec<NamingMessage> codec;
+
+ public NamingClientEventHandler(
+ final EventHandler<NamingMessage> handler, final Codec<NamingMessage> codec) {
+ this.handler = handler;
+ this.codec = codec;
+ }
+
+ @Override
+ public void onNext(final TransportEvent value) {
+ LOG.log(Level.FINE, "Transport: ", value);
+ this.handler.onNext(this.codec.decode(value.getData()));
+ }
+}
+
+/**
+ * Naming response message handler
+ */
+class NamingResponseHandler implements EventHandler<NamingMessage> {
+
+ private final BlockingQueue<NamingLookupResponse> replyLookupQueue;
+ private final BlockingQueue<NamingRegisterResponse> replyRegisterQueue;
+
+ NamingResponseHandler(BlockingQueue<NamingLookupResponse> replyLookupQueue,
+ BlockingQueue<NamingRegisterResponse> replyRegisterQueue) {
+ this.replyLookupQueue = replyLookupQueue;
+ this.replyRegisterQueue = replyRegisterQueue;
+ }
+
+ @Override
+ public void onNext(NamingMessage value) {
+ if (value instanceof NamingLookupResponse) {
+ replyLookupQueue.offer((NamingLookupResponse) value);
+ } else if (value instanceof NamingRegisterResponse) {
+ replyRegisterQueue.offer((NamingRegisterResponse) value);
+ } else {
+ throw new NamingRuntimeException("Unknown naming response message");
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
new file mode 100644
index 0000000..31b7fca
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.naming.NameAssignment;
+import org.apache.reef.io.naming.NamingLookup;
+import org.apache.reef.io.network.Cache;
+import org.apache.reef.io.network.naming.exception.NamingException;
+import org.apache.reef.io.network.naming.serialization.NamingLookupRequest;
+import org.apache.reef.io.network.naming.serialization.NamingLookupResponse;
+import org.apache.reef.io.network.naming.serialization.NamingMessage;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.Stage;
+import org.apache.reef.wake.impl.SyncStage;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.NetUtils;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Link;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener;
+import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Naming lookup client
+ */
+public class NameLookupClient implements Stage, NamingLookup {
+
+ private static final Logger LOG = Logger.getLogger(NameLookupClient.class.getName());
+ private final SocketAddress serverSocketAddr;
+ private final Transport transport;
+ private final Codec<NamingMessage> codec;
+ private final BlockingQueue<NamingLookupResponse> replyQueue;
+ private final long timeout;
+ private final Cache<Identifier, InetSocketAddress> cache;
+ private final int retryCount;
+ private final int retryTimeout;
+
+ /**
+ * Constructs a naming lookup client
+ *
+ * @param serverAddr a server address
+ * @param serverPort a server port number
+ * @param factory an identifier factory
+ * @param cache an cache
+ */
+ public NameLookupClient(final String serverAddr, final int serverPort,
+ final IdentifierFactory factory, final int retryCount, final int retryTimeout,
+ final Cache<Identifier, InetSocketAddress> cache) {
+ this(serverAddr, serverPort, 10000, factory, retryCount, retryTimeout, cache);
+ }
+
+ /**
+ * Constructs a naming lookup client
+ *
+ * @param serverAddr a server address
+ * @param serverPort a server port number
+ * @param timeout request timeout in ms
+ * @param factory an identifier factory
+ * @param cache an cache
+ */
+ public NameLookupClient(final String serverAddr, final int serverPort, final long timeout,
+ final IdentifierFactory factory, final int retryCount, final int retryTimeout,
+ final Cache<Identifier, InetSocketAddress> cache) {
+
+ this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort);
+ this.timeout = timeout;
+ this.cache = cache;
+ this.codec = NamingCodecFactory.createLookupCodec(factory);
+ this.replyQueue = new LinkedBlockingQueue<>();
+
+ this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), 0,
+ new SyncStage<>(new NamingLookupClientHandler(
+ new NamingLookupResponseHandler(this.replyQueue), this.codec)),
+ null, retryCount, retryTimeout);
+
+ this.retryCount = retryCount;
+ this.retryTimeout = retryTimeout;
+ }
+
+ NameLookupClient(final String serverAddr, final int serverPort, final long timeout,
+ final IdentifierFactory factory, final int retryCount, final int retryTimeout,
+ final BlockingQueue<NamingLookupResponse> replyQueue, final Transport transport,
+ final Cache<Identifier, InetSocketAddress> cache) {
+
+ this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort);
+ this.timeout = timeout;
+ this.cache = cache;
+ this.codec = NamingCodecFactory.createFullCodec(factory);
+ this.replyQueue = replyQueue;
+ this.transport = transport;
+ this.retryCount = retryCount;
+ this.retryTimeout = retryTimeout;
+ }
+
+ /**
+ * Finds an address for an identifier
+ *
+ * @param id an identifier
+ * @return an Internet socket address
+ */
+ @Override
+ public InetSocketAddress lookup(final Identifier id) throws Exception {
+
+ return cache.get(id, new Callable<InetSocketAddress>() {
+
+ @Override
+ public InetSocketAddress call() throws Exception {
+ final int origRetryCount = NameLookupClient.this.retryCount;
+ int retryCount = origRetryCount;
+ while (true) {
+ try {
+ return remoteLookup(id);
+ } catch (final NamingException e) {
+ if (retryCount <= 0) {
+ throw e;
+ } else {
+ final int retryTimeout = NameLookupClient.this.retryTimeout
+ * (origRetryCount - retryCount + 1);
+ LOG.log(Level.WARNING,
+ "Caught Naming Exception while looking up " + id
+ + " with Name Server. Will retry " + retryCount
+ + " time(s) after waiting for " + retryTimeout + " msec.");
+ Thread.sleep(retryTimeout * retryCount);
+ --retryCount;
+ }
+ }
+ }
+ }
+
+ });
+ }
+
+ /**
+ * Retrieves an address for an identifier remotely
+ *
+ * @param id an identifier
+ * @return an Internet socket address
+ * @throws Exception
+ */
+ public InetSocketAddress remoteLookup(final Identifier id) throws Exception {
+ // the lookup is not thread-safe, because concurrent replies may
+ // be read by the wrong thread.
+ // TODO: better fix uses a map of id's after REEF-198
+ synchronized (this) {
+
+ LOG.log(Level.INFO, "Looking up {0} on NameServer {1}", new Object[]{id, serverSocketAddr});
+
+ final List<Identifier> ids = Arrays.asList(id);
+ final Link<NamingMessage> link = transport.open(serverSocketAddr, codec,
+ new LoggingLinkListener<NamingMessage>());
+ link.write(new NamingLookupRequest(ids));
+
+ final NamingLookupResponse resp;
+ for (; ; ) {
+ try {
+ resp = replyQueue.poll(timeout, TimeUnit.MILLISECONDS);
+ break;
+ } catch (final InterruptedException e) {
+ LOG.log(Level.INFO, "Lookup interrupted", e);
+ throw new NamingException(e);
+ }
+ }
+
+ final List<NameAssignment> list = resp.getNameAssignments();
+ if (list.isEmpty()) {
+ throw new NamingException("Cannot find " + id + " from the name server");
+ } else {
+ return list.get(0).getAddress();
+ }
+ }
+ }
+
+ /**
+ * Closes resources
+ */
+ @Override
+ public void close() throws Exception {
+ // Should not close transport as we did not
+ // create it
+ }
+
+ @NamedParameter(doc = "When should a retry timeout(msec)?", short_name = "retryTimeout", default_value = "100")
+ public static class RetryTimeout implements Name<Integer> {
+ }
+
+ @NamedParameter(doc = "How many times should I retry?", short_name = "retryCount", default_value = "10")
+ public static class RetryCount implements Name<Integer> {
+ }
+}
+
+/**
+ * Naming lookup client transport event handler
+ */
+class NamingLookupClientHandler implements EventHandler<TransportEvent> {
+
+ private final EventHandler<NamingLookupResponse> handler;
+ private final Codec<NamingMessage> codec;
+
+ NamingLookupClientHandler(final EventHandler<NamingLookupResponse> handler, final Codec<NamingMessage> codec) {
+ this.handler = handler;
+ this.codec = codec;
+ }
+
+ @Override
+ public void onNext(final TransportEvent value) {
+ handler.onNext((NamingLookupResponse) codec.decode(value.getData()));
+ }
+
+}
+
+/**
+ * Naming lookup response handler
+ */
+class NamingLookupResponseHandler implements EventHandler<NamingLookupResponse> {
+
+ private final BlockingQueue<NamingLookupResponse> replyQueue;
+
+ NamingLookupResponseHandler(final BlockingQueue<NamingLookupResponse> replyQueue) {
+ this.replyQueue = replyQueue;
+ }
+
+ @Override
+ public void onNext(final NamingLookupResponse value) {
+ replyQueue.offer(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java
new file mode 100644
index 0000000..554a33b
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.naming.NamingRegistry;
+import org.apache.reef.io.network.naming.exception.NamingException;
+import org.apache.reef.io.network.naming.serialization.NamingMessage;
+import org.apache.reef.io.network.naming.serialization.NamingRegisterRequest;
+import org.apache.reef.io.network.naming.serialization.NamingRegisterResponse;
+import org.apache.reef.io.network.naming.serialization.NamingUnregisterRequest;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.Stage;
+import org.apache.reef.wake.impl.SyncStage;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.NetUtils;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Link;
+import org.apache.reef.wake.remote.transport.LinkListener;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener;
+import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Naming registry client
+ */
+public class NameRegistryClient implements Stage, NamingRegistry {
+
+ private static final Logger LOG = Logger.getLogger(NameRegistryClient.class.getName());
+
+ private final SocketAddress serverSocketAddr;
+ private final Transport transport;
+ private final Codec<NamingMessage> codec;
+ private final BlockingQueue<NamingRegisterResponse> replyQueue;
+ private final long timeout;
+
+ /**
+ * Constructs a naming registry client
+ *
+ * @param serverAddr a name server address
+ * @param serverPort a name server port
+ * @param factory an identifier factory
+ */
+ public NameRegistryClient(
+ final String serverAddr, final int serverPort, final IdentifierFactory factory) {
+ this(serverAddr, serverPort, 10000, factory);
+ }
+
+ /**
+ * Constructs a naming registry client
+ *
+ * @param serverAddr a name server address
+ * @param serverPort a name server port
+ * @param timeout timeout in ms
+ * @param factory an identifier factory
+ */
+ public NameRegistryClient(final String serverAddr, final int serverPort,
+ final long timeout, final IdentifierFactory factory) {
+
+ this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort);
+ this.timeout = timeout;
+ this.codec = NamingCodecFactory.createRegistryCodec(factory);
+ this.replyQueue = new LinkedBlockingQueue<>();
+ this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), 0,
+ new SyncStage<>(new NamingRegistryClientHandler(new NamingRegistryResponseHandler(replyQueue), codec)),
+ null, 3, 10000);
+ }
+
+ public NameRegistryClient(final String serverAddr, final int serverPort,
+ final long timeout, final IdentifierFactory factory,
+ final BlockingQueue<NamingRegisterResponse> replyQueue,
+ final Transport transport) {
+ this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort);
+ this.timeout = timeout;
+ this.codec = NamingCodecFactory.createFullCodec(factory);
+ this.replyQueue = replyQueue;
+ this.transport = transport;
+ }
+
+ /**
+ * Registers an (identifier, address) mapping
+ *
+ * @param id an identifier
+ * @param addr an Internet socket address
+ */
+ @Override
+ public void register(final Identifier id, final InetSocketAddress addr) throws Exception {
+
+ // needed to keep threads from reading the wrong response
+ // TODO: better fix matches replies to threads with a map after REEF-198
+ synchronized (this) {
+
+ LOG.log(Level.FINE, "Register {0} : {1}", new Object[]{id, addr});
+
+ final Link<NamingMessage> link = this.transport.open(
+ this.serverSocketAddr, this.codec, new LoggingLinkListener<NamingMessage>());
+
+ link.write(new NamingRegisterRequest(new NameAssignmentTuple(id, addr)));
+
+ for (; ; ) {
+ try {
+ this.replyQueue.poll(this.timeout, TimeUnit.MILLISECONDS);
+ break;
+ } catch (final InterruptedException e) {
+ LOG.log(Level.INFO, "Interrupted", e);
+ throw new NamingException(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Unregisters an identifier
+ *
+ * @param id an identifier
+ */
+ @Override
+ public void unregister(Identifier id) throws IOException {
+ Link<NamingMessage> link = transport.open(serverSocketAddr, codec,
+ new LinkListener<NamingMessage>() {
+ @Override
+ public void messageReceived(NamingMessage message) {
+ }
+ });
+ link.write(new NamingUnregisterRequest(id));
+ }
+
+ /**
+ * Closes resources
+ */
+ @Override
+ public void close() throws Exception {
+ // Should not close transport as we did not
+ // create it
+ }
+}
+
+/**
+ * Naming registry client transport event handler
+ */
+class NamingRegistryClientHandler implements EventHandler<TransportEvent> {
+ private static final Logger LOG = Logger.getLogger(NamingRegistryClientHandler.class.getName());
+
+ private final EventHandler<NamingRegisterResponse> handler;
+ private final Codec<NamingMessage> codec;
+
+ NamingRegistryClientHandler(EventHandler<NamingRegisterResponse> handler, Codec<NamingMessage> codec) {
+ this.handler = handler;
+ this.codec = codec;
+ }
+
+ @Override
+ public void onNext(TransportEvent value) {
+ LOG.log(Level.FINE, value.toString());
+ handler.onNext((NamingRegisterResponse) codec.decode(value.getData()));
+ }
+}
+
+/**
+ * Naming register response handler
+ */
+class NamingRegistryResponseHandler implements EventHandler<NamingRegisterResponse> {
+
+ private final BlockingQueue<NamingRegisterResponse> replyQueue;
+
+ NamingRegistryResponseHandler(BlockingQueue<NamingRegisterResponse> replyQueue) {
+ this.replyQueue = replyQueue;
+ }
+
+ @Override
+ public void onNext(NamingRegisterResponse value) {
+ replyQueue.offer(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java
new file mode 100644
index 0000000..5a4c765
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.naming.NameAssignment;
+import org.apache.reef.io.network.naming.serialization.*;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.Stage;
+import org.apache.reef.wake.impl.MultiEventHandler;
+import org.apache.reef.wake.impl.SyncStage;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.NetUtils;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+import org.apache.reef.webserver.AvroReefServiceInfo;
+import org.apache.reef.webserver.ReefEventStateManager;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Naming server interface
+ */
+public interface NameServer extends Stage {
+
+ /**
+ * get port number
+ * @return
+ */
+ public int getPort();
+
+ /**
+ * Registers an (identifier, address) mapping locally
+ *
+ * @param id an identifier
+ * @param addr an Internet socket address
+ */
+ public void register(final Identifier id, final InetSocketAddress addr);
+
+ /**
+ * Unregisters an identifier locally
+ *
+ * @param id an identifier
+ */
+ public void unregister(final Identifier id);
+
+ /**
+ * Finds an address for an identifier locally
+ *
+ * @param id an identifier
+ * @return an Internet socket address
+ */
+ public InetSocketAddress lookup(final Identifier id);
+
+ /**
+ * Finds addresses for identifiers locally
+ *
+ * @param identifiers an Iterable of identifiers
+ * @return a list of name assignments
+ */
+ public List<NameAssignment> lookup(final Iterable<Identifier> identifiers);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerConfiguration.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerConfiguration.java
new file mode 100644
index 0000000..1912686
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerConfiguration.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.wake.IdentifierFactory;
+
+/**
+ * Configuration Module Builder for NameServer
+ */
+public final class NameServerConfiguration extends ConfigurationModuleBuilder {
+
+ /**
+ * The port used by name server
+ */
+ public static final OptionalParameter<Integer> NAME_SERVICE_PORT = new OptionalParameter<>();
+ /**
+ * DNS hostname running the name service
+ */
+ public static final OptionalParameter<String> NAME_SERVER_HOSTNAME = new OptionalParameter<>();
+ /**
+ * Identifier factory for the name service
+ */
+ public static final OptionalParameter<IdentifierFactory> NAME_SERVER_IDENTIFIER_FACTORY = new OptionalParameter<>();
+
+ public static final ConfigurationModule CONF = new NameServerConfiguration()
+ .bindNamedParameter(NameServerParameters.NameServerPort.class, NAME_SERVICE_PORT)
+ .bindNamedParameter(NameServerParameters.NameServerAddr.class, NAME_SERVER_HOSTNAME)
+ .bindNamedParameter(NameServerParameters.NameServerIdentifierFactory.class, NAME_SERVER_IDENTIFIER_FACTORY)
+ .bindImplementation(NameServer.class, NameServerImpl.class)
+ .build();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
new file mode 100644
index 0000000..306682a
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.naming.NameAssignment;
+import org.apache.reef.io.network.naming.serialization.*;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.Stage;
+import org.apache.reef.wake.impl.MultiEventHandler;
+import org.apache.reef.wake.impl.SyncStage;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.NetUtils;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+import org.apache.reef.webserver.AvroReefServiceInfo;
+import org.apache.reef.webserver.ReefEventStateManager;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Naming server implementation
+ */
+public class NameServerImpl implements NameServer {
+
+ private static final Logger LOG = Logger.getLogger(NameServer.class.getName());
+
+ private final Transport transport;
+ private final Map<Identifier, InetSocketAddress> idToAddrMap;
+ private final ReefEventStateManager reefEventStateManager;
+ private final int port;
+
+ /**
+ * @param port a listening port number
+ * @param factory an identifier factory
+ * @deprecated inject the NameServer instead of new it up
+ * Constructs a name server
+ */
+ // TODO: All existing NameServer usage is currently new-up, need to make them injected as well.
+ @Deprecated
+ public NameServerImpl(
+ final int port,
+ final IdentifierFactory factory) {
+
+ this.reefEventStateManager = null;
+ final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
+ final EventHandler<NamingMessage> handler = createEventHandler(codec);
+
+ this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), port, null,
+ new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000);
+
+ this.port = transport.getListeningPort();
+ this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>());
+
+ LOG.log(Level.FINE, "NameServer starting, listening at port {0}", this.port);
+ }
+
+
+ /**
+ * Constructs a name server
+ *
+ * @param port a listening port number
+ * @param factory an identifier factory
+ * @param reefEventStateManager the event state manager used to register name server info
+ */
+ @Inject
+ public NameServerImpl(
+ final @Parameter(NameServerParameters.NameServerPort.class) int port,
+ final @Parameter(NameServerParameters.NameServerIdentifierFactory.class) IdentifierFactory factory,
+ final ReefEventStateManager reefEventStateManager) {
+
+ this.reefEventStateManager = reefEventStateManager;
+ final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
+ final EventHandler<NamingMessage> handler = createEventHandler(codec);
+
+ this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), port, null,
+ new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000);
+
+ this.port = transport.getListeningPort();
+ this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>());
+
+ this.reefEventStateManager.registerServiceInfo(
+ AvroReefServiceInfo.newBuilder()
+ .setServiceName("NameServer")
+ .setServiceInfo(getNameServerId())
+ .build());
+ LOG.log(Level.FINE, "NameServer starting, listening at port {0}", this.port);
+ }
+
+ private EventHandler<NamingMessage> createEventHandler(final Codec<NamingMessage> codec) {
+
+ final Map<Class<? extends NamingMessage>, EventHandler<? extends NamingMessage>>
+ clazzToHandlerMap = new HashMap<>();
+
+ clazzToHandlerMap.put(NamingLookupRequest.class, new NamingLookupRequestHandler(this, codec));
+ clazzToHandlerMap.put(NamingRegisterRequest.class, new NamingRegisterRequestHandler(this, codec));
+ clazzToHandlerMap.put(NamingUnregisterRequest.class, new NamingUnregisterRequestHandler(this));
+ final EventHandler<NamingMessage> handler = new MultiEventHandler<>(clazzToHandlerMap);
+
+ return handler;
+ }
+
+ /**
+ * Gets port
+ */
+ @Override
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * Closes resources
+ */
+ @Override
+ public void close() throws Exception {
+ transport.close();
+ }
+
+ /**
+ * Registers an (identifier, address) mapping locally
+ *
+ * @param id an identifier
+ * @param addr an Internet socket address
+ */
+ @Override
+ public void register(final Identifier id, final InetSocketAddress addr) {
+ LOG.log(Level.FINE, "id: " + id + " addr: " + addr);
+ idToAddrMap.put(id, addr);
+ }
+
+ /**
+ * Unregisters an identifier locally
+ *
+ * @param id an identifier
+ */
+ @Override
+ public void unregister(final Identifier id) {
+ LOG.log(Level.FINE, "id: " + id);
+ idToAddrMap.remove(id);
+ }
+
+ /**
+ * Finds an address for an identifier locally
+ *
+ * @param id an identifier
+ * @return an Internet socket address
+ */
+ @Override
+ public InetSocketAddress lookup(final Identifier id) {
+ LOG.log(Level.FINE, "id: {0}", id);
+ return idToAddrMap.get(id);
+ }
+
+ /**
+ * Finds addresses for identifiers locally
+ *
+ * @param identifiers an iterable of identifiers
+ * @return a list of name assignments
+ */
+ @Override
+ public List<NameAssignment> lookup(final Iterable<Identifier> identifiers) {
+ LOG.log(Level.FINE, "identifiers");
+ final List<NameAssignment> nas = new ArrayList<>();
+ for (final Identifier id : identifiers) {
+ final InetSocketAddress addr = idToAddrMap.get(id);
+ LOG.log(Level.FINEST, "id : {0} addr: {1}", new Object[]{id, addr});
+ if (addr != null) {
+ nas.add(new NameAssignmentTuple(id, addr));
+ }
+ }
+ return nas;
+ }
+
+ private String getNameServerId() {
+ return NetUtils.getLocalAddress() + ":" + getPort();
+ }
+}
+
+/**
+ * Naming server transport event handler that invokes a specific naming message handler
+ */
+class NamingServerHandler implements EventHandler<TransportEvent> {
+
+ private final Codec<NamingMessage> codec;
+ private final EventHandler<NamingMessage> handler;
+
+ NamingServerHandler(final EventHandler<NamingMessage> handler, final Codec<NamingMessage> codec) {
+ this.codec = codec;
+ this.handler = handler;
+ }
+
+ @Override
+ public void onNext(final TransportEvent value) {
+ final byte[] data = value.getData();
+ final NamingMessage message = codec.decode(data);
+ message.setLink(value.getLink());
+ handler.onNext(message);
+ }
+}
+
+/**
+ * Naming lookup request handler
+ */
+class NamingLookupRequestHandler implements EventHandler<NamingLookupRequest> {
+
+ private static final Logger LOG = Logger.getLogger(NamingLookupRequestHandler.class.getName());
+
+
+ private final NameServer server;
+ private final Codec<NamingMessage> codec;
+
+ public NamingLookupRequestHandler(final NameServer server, final Codec<NamingMessage> codec) {
+ this.server = server;
+ this.codec = codec;
+ }
+
+ @Override
+ public void onNext(final NamingLookupRequest value) {
+ final List<NameAssignment> nas = server.lookup(value.getIdentifiers());
+ final byte[] resp = codec.encode(new NamingLookupResponse(nas));
+ try {
+ value.getLink().write(resp);
+ } catch (final IOException e) {
+ //Actually, there is no way Link.write can throw and IOException
+ //after netty4 merge. This needs to cleaned up
+ LOG.throwing("NamingLookupRequestHandler", "onNext", e);
+ }
+ }
+}
+
+/**
+ * Naming register request handler
+ */
+class NamingRegisterRequestHandler implements EventHandler<NamingRegisterRequest> {
+
+ private static final Logger LOG = Logger.getLogger(NamingRegisterRequestHandler.class.getName());
+
+
+ private final NameServer server;
+ private final Codec<NamingMessage> codec;
+
+ public NamingRegisterRequestHandler(final NameServer server, final Codec<NamingMessage> codec) {
+ this.server = server;
+ this.codec = codec;
+ }
+
+ @Override
+ public void onNext(final NamingRegisterRequest value) {
+ server.register(value.getNameAssignment().getIdentifier(), value.getNameAssignment().getAddress());
+ final byte[] resp = codec.encode(new NamingRegisterResponse(value));
+ try {
+ value.getLink().write(resp);
+ } catch (final IOException e) {
+ //Actually, there is no way Link.write can throw and IOException
+ //after netty4 merge. This needs to cleaned up
+ LOG.throwing("NamingRegisterRequestHandler", "onNext", e);
+ }
+ }
+}
+
+/**
+ * Naming unregister request handler
+ */
+class NamingUnregisterRequestHandler implements EventHandler<NamingUnregisterRequest> {
+
+ private final NameServer server;
+
+ public NamingUnregisterRequestHandler(final NameServer server) {
+ this.server = server;
+ }
+
+ @Override
+ public void onNext(final NamingUnregisterRequest value) {
+ server.unregister(value.getIdentifier());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerParameters.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerParameters.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerParameters.java
new file mode 100644
index 0000000..1888886
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerParameters.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.IdentifierFactory;
+
+public class NameServerParameters {
+
+ @NamedParameter(doc = "port for the name service", default_value = "0", short_name = "nameport")
+ public class NameServerPort implements Name<Integer> {
+ }
+
+ @NamedParameter(doc = "DNS hostname running the name service")
+ public class NameServerAddr implements Name<String> {
+ }
+
+ @NamedParameter(doc = "identifier factory for the name service", default_class = StringIdentifierFactory.class)
+ public class NameServerIdentifierFactory implements Name<IdentifierFactory> {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NamingCodecFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NamingCodecFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NamingCodecFactory.java
new file mode 100644
index 0000000..8ec320a
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NamingCodecFactory.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.network.naming.serialization.*;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.impl.MultiCodec;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Factory to create naming codecs
+ */
+class NamingCodecFactory {
+
+ /**
+ * Creates a codec only for lookup
+ *
+ * @param factory an identifier factory
+ * @return a codec
+ */
+ static Codec<NamingMessage> createLookupCodec(IdentifierFactory factory) {
+ Map<Class<? extends NamingMessage>, Codec<? extends NamingMessage>> clazzToCodecMap
+ = new HashMap<Class<? extends NamingMessage>, Codec<? extends NamingMessage>>();
+ clazzToCodecMap.put(NamingLookupRequest.class, new NamingLookupRequestCodec(factory));
+ clazzToCodecMap.put(NamingLookupResponse.class, new NamingLookupResponseCodec(factory));
+ Codec<NamingMessage> codec = new MultiCodec<NamingMessage>(clazzToCodecMap);
+ return codec;
+ }
+
+ /**
+ * Creates a codec only for registration
+ *
+ * @param factory an identifier factory
+ * @return a codec
+ */
+ static Codec<NamingMessage> createRegistryCodec(IdentifierFactory factory) {
+ Map<Class<? extends NamingMessage>, Codec<? extends NamingMessage>> clazzToCodecMap
+ = new HashMap<Class<? extends NamingMessage>, Codec<? extends NamingMessage>>();
+ clazzToCodecMap.put(NamingRegisterRequest.class, new NamingRegisterRequestCodec(factory));
+ clazzToCodecMap.put(NamingRegisterResponse.class, new NamingRegisterResponseCodec(new NamingRegisterRequestCodec(factory)));
+ clazzToCodecMap.put(NamingUnregisterRequest.class, new NamingUnregisterRequestCodec(factory));
+ Codec<NamingMessage> codec = new MultiCodec<NamingMessage>(clazzToCodecMap);
+ return codec;
+ }
+
+ /**
+ * Creates a codec for both lookup and registration
+ *
+ * @param factory an identifier factory
+ * @return a codec
+ */
+ static Codec<NamingMessage> createFullCodec(IdentifierFactory factory) {
+ Map<Class<? extends NamingMessage>, Codec<? extends NamingMessage>> clazzToCodecMap
+ = new HashMap<Class<? extends NamingMessage>, Codec<? extends NamingMessage>>();
+ clazzToCodecMap.put(NamingLookupRequest.class, new NamingLookupRequestCodec(factory));
+ clazzToCodecMap.put(NamingLookupResponse.class, new NamingLookupResponseCodec(factory));
+ clazzToCodecMap.put(NamingRegisterRequest.class, new NamingRegisterRequestCodec(factory));
+ clazzToCodecMap.put(NamingRegisterResponse.class, new NamingRegisterResponseCodec(new NamingRegisterRequestCodec(factory)));
+ clazzToCodecMap.put(NamingUnregisterRequest.class, new NamingUnregisterRequestCodec(factory));
+ Codec<NamingMessage> codec = new MultiCodec<NamingMessage>(clazzToCodecMap);
+ return codec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/NamingException.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/NamingException.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/NamingException.java
new file mode 100644
index 0000000..b5093b4
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/NamingException.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.exception;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+
+/**
+ * Naming exception
+ */
+public class NamingException extends NetworkException {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs a new resourcemanager naming exception with the specified detail message and cause
+ *
+ * @param s the detailed message
+ * @param e the cause
+ */
+ public NamingException(String s, Throwable e) {
+ super(s, e);
+ }
+
+ /**
+ * Constructs a new resourcemanager naming exception with the specified detail message
+ *
+ * @param s the detailed message
+ */
+ public NamingException(String s) {
+ super(s);
+ }
+
+ /**
+ * Constructs a new resourcemanager naming exception with the specified cause
+ *
+ * @param e the cause
+ */
+ public NamingException(Throwable e) {
+ super(e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/NamingRuntimeException.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/NamingRuntimeException.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/NamingRuntimeException.java
new file mode 100644
index 0000000..9ed0f49
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/NamingRuntimeException.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.exception;
+
+import org.apache.reef.io.network.exception.NetworkRuntimeException;
+
+/**
+ * Naming resourcemanager exception
+ */
+public class NamingRuntimeException extends NetworkRuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs a new resourcemanager naming exception with the specified detail message and cause
+ *
+ * @param s the detailed message
+ * @param e the cause
+ */
+ public NamingRuntimeException(String s, Throwable e) {
+ super(s, e);
+ }
+
+ /**
+ * Constructs a new resourcemanager naming exception with the specified detail message
+ *
+ * @param s the detailed message
+ */
+ public NamingRuntimeException(String s) {
+ super(s);
+ }
+
+ /**
+ * Constructs a new resourcemanager naming exception with the specified cause
+ *
+ * @param e the cause
+ */
+ public NamingRuntimeException(Throwable e) {
+ super(e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/package-info.java
new file mode 100644
index 0000000..86741b8
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/exception/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.exception;
\ No newline at end of file