You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/06/28 07:50:34 UTC
incubator-reef git commit: [REEF-413] Make the NameClient injectable
Repository: incubator-reef
Updated Branches:
refs/heads/master 9c8fee2a1 -> 32c964579
[REEF-413] Make the NameClient injectable
This pull request addressed the issue by
* Introduce an interface of NameResolver
* NameClient implements NameResolver
JIRA:
[REEF-413](https://issues.apache.org/jira/browse/REEF-413)
Pull Request:
This closes #248
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/32c96457
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/32c96457
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/32c96457
Branch: refs/heads/master
Commit: 32c9645794879874e788fe63d2e6ad5095c6ca2e
Parents: 9c8fee2
Author: taegeonum <ta...@gmail.com>
Authored: Fri Jun 26 20:45:09 2015 +0900
Committer: Markus Weimer <we...@apache.org>
Committed: Sat Jun 27 22:49:15 2015 -0700
----------------------------------------------------------------------
.../group/impl/driver/GroupCommDriverImpl.java | 30 +++++++--
.../reef/io/network/impl/NetworkService.java | 54 ++++++++++-----
.../network/impl/NetworkServiceParameters.java | 5 +-
.../reef/io/network/naming/NameClient.java | 49 +++++++++++---
.../io/network/naming/NameLookupClient.java | 12 +---
.../reef/io/network/naming/NameResolver.java | 31 +++++++++
.../naming/NameResolverConfiguration.java | 71 ++++++++++++++++++++
.../parameters/NameResolverCacheTimeout.java | 27 ++++++++
.../NameResolverIdentifierFactory.java | 30 +++++++++
.../parameters/NameResolverNameServerAddr.java | 27 ++++++++
.../parameters/NameResolverNameServerPort.java | 28 ++++++++
.../parameters/NameResolverRetryCount.java | 28 ++++++++
.../parameters/NameResolverRetryTimeout.java | 28 ++++++++
.../reef/services/network/NameClientTest.java | 35 +++++++---
.../reef/services/network/NamingTest.java | 18 +++--
.../services/network/NetworkServiceTest.java | 68 +++++++++++++++----
16 files changed, 470 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/32c96457/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
index 5ac4fa8..4710c31 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
@@ -36,15 +36,20 @@ import org.apache.reef.io.network.group.impl.task.GroupCommNetworkHandlerImpl;
import org.apache.reef.io.network.group.impl.utils.BroadcastingEventHandler;
import org.apache.reef.io.network.group.impl.utils.Utils;
import org.apache.reef.io.network.impl.*;
+import org.apache.reef.io.network.naming.NameResolver;
+import org.apache.reef.io.network.naming.NameResolverConfiguration;
import org.apache.reef.io.network.naming.NameServer;
import org.apache.reef.io.network.naming.NameServerImpl;
-import org.apache.reef.io.network.naming.NameServerParameters;
+import org.apache.reef.io.network.naming.parameters.NameResolverNameServerAddr;
+import org.apache.reef.io.network.naming.parameters.NameResolverNameServerPort;
import org.apache.reef.io.network.util.StringIdentifierFactory;
import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
import org.apache.reef.tang.JavaConfigurationBuilder;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.util.SingletonAsserter;
import org.apache.reef.wake.EStage;
@@ -170,7 +175,22 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver {
groupCommFailedEvaluatorHandler);
this.groupCommMessageHandler = new GroupCommMessageHandler();
this.groupCommMessageStage = new SingleThreadStage<>("GroupCommMessageStage", groupCommMessageHandler, 100 * 1000);
- this.netService = new NetworkService<>(idFac, 0, nameServiceAddr, nameServicePort,
+
+ final Configuration nameResolverConf = Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+ .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, nameServiceAddr)
+ .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServicePort)
+ .build())
+ .build();
+
+ final Injector injector = Tang.Factory.getTang().newInjector(nameResolverConf);
+ NameResolver nameResolver = null;
+ try {
+ nameResolver = injector.getInstance(NameResolver.class);
+ } catch (InjectionException e) {
+ throw new RuntimeException(e);
+ }
+
+ this.netService = new NetworkService<>(idFac, 0, nameResolver,
new GroupCommunicationMessageCodec(), tpFactory,
new EventHandler<Message<GroupCommunicationMessage>>() {
@@ -243,8 +263,8 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver {
GroupCommNetworkHandlerImpl.class)
.bindNamedParameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class,
ExceptionHandler.class)
- .bindNamedParameter(NameServerParameters.NameServerAddr.class, nameServiceAddr)
- .bindNamedParameter(NameServerParameters.NameServerPort.class, Integer.toString(nameServicePort))
+ .bindNamedParameter(NameResolverNameServerAddr.class, nameServiceAddr)
+ .bindNamedParameter(NameResolverNameServerPort.class, Integer.toString(nameServicePort))
.bindNamedParameter(NetworkServiceParameters.NetworkServicePort.class, "0").build();
LOG.exiting("GroupCommDriverImpl", "getServiceConf", confSerializer.toString(retVal));
return retVal;
@@ -295,4 +315,4 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver {
return groupCommFailedEvaluatorStage;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/32c96457/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
index 03d074c..3f49b11 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
@@ -23,11 +23,12 @@ import org.apache.reef.io.naming.Naming;
import org.apache.reef.io.network.Connection;
import org.apache.reef.io.network.ConnectionFactory;
import org.apache.reef.io.network.Message;
-import org.apache.reef.wake.remote.transport.TransportFactory;
-import org.apache.reef.io.network.naming.NameCache;
import org.apache.reef.io.network.naming.NameClient;
-import org.apache.reef.io.network.naming.NameLookupClient;
-import org.apache.reef.io.network.naming.NameServerParameters;
+import org.apache.reef.io.network.naming.NameResolver;
+import org.apache.reef.io.network.naming.parameters.NameResolverNameServerAddr;
+import org.apache.reef.io.network.naming.parameters.NameResolverNameServerPort;
+import org.apache.reef.io.network.naming.parameters.NameResolverRetryCount;
+import org.apache.reef.io.network.naming.parameters.NameResolverRetryTimeout;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Parameter;
@@ -40,6 +41,7 @@ import org.apache.reef.wake.remote.address.LocalAddressProvider;
import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
import org.apache.reef.wake.remote.impl.TransportEvent;
import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.TransportFactory;
import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener;
import javax.inject.Inject;
@@ -62,8 +64,8 @@ public final class NetworkService<T> implements Stage, ConnectionFactory<T> {
static {
try {
final Injector injector = Tang.Factory.getTang().newInjector();
- retryCount = injector.getNamedInstance(NameLookupClient.RetryCount.class);
- retryTimeout = injector.getNamedInstance(NameLookupClient.RetryTimeout.class);
+ retryCount = injector.getNamedInstance(NameResolverRetryCount.class);
+ retryTimeout = injector.getNamedInstance(NameResolverRetryTimeout.class);
} catch (final InjectionException ex) {
final String msg = "Exception while trying to find default values for retryCount & Timeout";
LOG.log(Level.SEVERE, msg, ex);
@@ -74,7 +76,7 @@ public final class NetworkService<T> implements Stage, ConnectionFactory<T> {
private final IdentifierFactory factory;
private final Codec<T> codec;
private final Transport transport;
- private final NameClient nameClient;
+ private final NameResolver nameResolver;
private final ConcurrentMap<Identifier, Connection<T>> idToConnMap = new ConcurrentHashMap<>();
private final EStage<Tuple<Identifier, InetSocketAddress>> nameServiceRegisteringStage;
private final EStage<Identifier> nameServiceUnregisteringStage;
@@ -148,10 +150,29 @@ public final class NetworkService<T> implements Stage, ConnectionFactory<T> {
public NetworkService(
@Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) final IdentifierFactory factory,
@Parameter(NetworkServiceParameters.NetworkServicePort.class) final int nsPort,
- @Parameter(NameServerParameters.NameServerAddr.class) final String nameServerAddr,
- @Parameter(NameServerParameters.NameServerPort.class) final int nameServerPort,
- @Parameter(NameLookupClient.RetryCount.class) final int retryCount,
- @Parameter(NameLookupClient.RetryTimeout.class) final int retryTimeout,
+ @Parameter(NameResolverNameServerAddr.class) final String nameServerAddr,
+ @Parameter(NameResolverNameServerPort.class) final int nameServerPort,
+ @Parameter(NameResolverRetryCount.class) final int retryCount,
+ @Parameter(NameResolverRetryTimeout.class) final int retryTimeout,
+ @Parameter(NetworkServiceParameters.NetworkServiceCodec.class) final Codec<T> codec,
+ @Parameter(NetworkServiceParameters.NetworkServiceTransportFactory.class) final TransportFactory tpFactory,
+ @Parameter(NetworkServiceParameters.NetworkServiceHandler.class) final EventHandler<Message<T>> recvHandler,
+ @Parameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class) final EventHandler<Exception> exHandler,
+ final LocalAddressProvider localAddressProvider) {
+ this(factory, nsPort, new NameClient(nameServerAddr, nameServerPort,
+ 30000, factory, retryCount, retryTimeout, localAddressProvider, tpFactory),
+ codec, tpFactory, recvHandler, exHandler, localAddressProvider);
+ }
+
+ /**
+ * @deprecated in 0.12. Use Tang to obtain an instance of this instead.
+ */
+ @Deprecated
+ @Inject
+ public NetworkService(
+ @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) final IdentifierFactory factory,
+ @Parameter(NetworkServiceParameters.NetworkServicePort.class) final int nsPort,
+ final NameResolver nameResolver,
@Parameter(NetworkServiceParameters.NetworkServiceCodec.class) final Codec<T> codec,
@Parameter(NetworkServiceParameters.NetworkServiceTransportFactory.class) final TransportFactory tpFactory,
@Parameter(NetworkServiceParameters.NetworkServiceHandler.class) final EventHandler<Message<T>> recvHandler,
@@ -164,15 +185,14 @@ public final class NetworkService<T> implements Stage, ConnectionFactory<T> {
new LoggingEventHandler<TransportEvent>(),
new MessageHandler<T>(recvHandler, codec, factory), exHandler);
- this.nameClient = new NameClient(nameServerAddr, nameServerPort,
- factory, retryCount, retryTimeout, new NameCache(30000), localAddressProvider);
+ this.nameResolver = nameResolver;
this.nameServiceRegisteringStage = new SingleThreadStage<>(
"NameServiceRegisterer", new EventHandler<Tuple<Identifier, InetSocketAddress>>() {
@Override
public void onNext(final Tuple<Identifier, InetSocketAddress> tuple) {
try {
- nameClient.register(tuple.getKey(), tuple.getValue());
+ nameResolver.register(tuple.getKey(), tuple.getValue());
LOG.log(Level.FINEST, "Registered {0} with nameservice", tuple.getKey());
} catch (final Exception ex) {
final String msg = "Unable to register " + tuple.getKey() + "with name service";
@@ -187,7 +207,7 @@ public final class NetworkService<T> implements Stage, ConnectionFactory<T> {
@Override
public void onNext(final Identifier id) {
try {
- nameClient.unregister(id);
+ nameResolver.unregister(id);
LOG.log(Level.FINEST, "Unregistered {0} with nameservice", id);
} catch (final Exception ex) {
final String msg = "Unable to unregister " + id + " with name service";
@@ -227,7 +247,7 @@ public final class NetworkService<T> implements Stage, ConnectionFactory<T> {
}
public Naming getNameClient() {
- return this.nameClient;
+ return this.nameResolver;
}
public IdentifierFactory getIdentifierFactory() {
@@ -242,7 +262,7 @@ public final class NetworkService<T> implements Stage, ConnectionFactory<T> {
public void close() throws Exception {
LOG.log(Level.FINE, "Shutting down");
this.transport.close();
- this.nameClient.close();
+ this.nameResolver.close();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/32c96457/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
index 865cc3d..13d9281 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
@@ -18,14 +18,14 @@
*/
package org.apache.reef.io.network.impl;
-import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory;
-import org.apache.reef.wake.remote.transport.TransportFactory;
import org.apache.reef.io.network.util.StringIdentifierFactory;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.NamedParameter;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.IdentifierFactory;
import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.transport.TransportFactory;
+import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory;
public class NetworkServiceParameters {
@@ -57,5 +57,4 @@ public class NetworkServiceParameters {
@NamedParameter(doc = "network exception handler for the network service", short_name = "exhandler")
public static class NetworkServiceExceptionHandler implements Name<EventHandler<?>> {
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/32c96457/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
index bdeb7db..c391b67 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
@@ -18,25 +18,26 @@
*/
package org.apache.reef.io.network.naming;
-import org.apache.reef.io.naming.Naming;
import org.apache.reef.io.network.naming.exception.NamingRuntimeException;
+import org.apache.reef.io.network.naming.parameters.*;
import org.apache.reef.io.network.naming.serialization.NamingLookupResponse;
import org.apache.reef.io.network.naming.serialization.NamingMessage;
import org.apache.reef.io.network.naming.serialization.NamingRegisterResponse;
+import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.util.cache.Cache;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.Identifier;
import org.apache.reef.wake.IdentifierFactory;
-import org.apache.reef.wake.Stage;
import org.apache.reef.wake.impl.SyncStage;
import org.apache.reef.wake.remote.Codec;
import org.apache.reef.wake.remote.address.LocalAddressProvider;
import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
import org.apache.reef.wake.remote.impl.TransportEvent;
import org.apache.reef.wake.remote.transport.Transport;
-import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory;
import org.apache.reef.wake.remote.transport.TransportFactory;
+import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory;
+import javax.inject.Inject;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.BlockingQueue;
@@ -45,16 +46,15 @@ import java.util.logging.Level;
import java.util.logging.Logger;
/**
- * Naming client.
+ * Naming client looking up remote server.
*/
-public class NameClient implements Stage, Naming {
+public final class NameClient implements NameResolver {
private static final Logger LOG = Logger.getLogger(NameClient.class.getName());
private NameLookupClient lookupClient;
private NameRegistryClient registryClient;
private Transport transport;
-
@Deprecated
public NameClient(final String serverAddr,
final int serverPort,
@@ -73,6 +73,7 @@ public class NameClient implements Stage, Naming {
* @param factory an identifier factory
* @param cache a cache
*/
+ @Deprecated
public NameClient(final String serverAddr,
final int serverPort,
final IdentifierFactory factory,
@@ -110,6 +111,7 @@ public class NameClient implements Stage, Naming {
* @param factory an identifier factory
* @param cache a cache
*/
+ @Deprecated
public NameClient(final String serverAddr,
final int serverPort,
final long timeout,
@@ -118,8 +120,8 @@ public class NameClient implements Stage, Naming {
final int retryTimeout,
final Cache<Identifier, InetSocketAddress> cache,
final LocalAddressProvider localAddressProvider) {
- this(serverAddr, serverPort, timeout, factory, retryCount, retryTimeout,
- cache, localAddressProvider, new MessagingTransportFactory());
+ this(serverAddr, serverPort, timeout, factory, retryCount, retryTimeout,
+ cache, localAddressProvider, new MessagingTransportFactory());
}
/**
@@ -132,6 +134,7 @@ public class NameClient implements Stage, Naming {
* @param cache a cache
* @param tpFactory transport factory
*/
+ @Deprecated
public NameClient(final String serverAddr,
final int serverPort,
final long timeout,
@@ -141,7 +144,35 @@ public class NameClient implements Stage, Naming {
final Cache<Identifier, InetSocketAddress> cache,
final LocalAddressProvider localAddressProvider,
final TransportFactory tpFactory) {
+ this(serverAddr, serverPort, timeout, factory, retryCount, retryTimeout, localAddressProvider, tpFactory);
+ }
+ /**
+ * Constructs a naming client.
+ *
+ * @param serverAddr a server address
+ * @param serverPort a server port number
+ * @param timeout timeout in ms
+ * @param factory an identifier factory
+ * @param retryCount the number of retries
+ * @param retryTimeout retry timeout
+ * @param localAddressProvider a local address provider
+ * @param tpFactory transport factory
+ * @deprecated in 0.12. Use Tang to obtain an instance of this instead.
+ */
+ @Deprecated
+ @Inject
+ public NameClient(
+ @Parameter(NameResolverNameServerAddr.class) final String serverAddr,
+ @Parameter(NameResolverNameServerPort.class) final int serverPort,
+ @Parameter(NameResolverCacheTimeout.class) final long timeout,
+ @Parameter(NameResolverIdentifierFactory.class) final IdentifierFactory factory,
+ @Parameter(NameResolverRetryCount.class) final int retryCount,
+ @Parameter(NameResolverRetryTimeout.class) final int retryTimeout,
+ final LocalAddressProvider localAddressProvider,
+ final TransportFactory tpFactory) {
+
+ NameCache cache = new NameCache(timeout);
final BlockingQueue<NamingLookupResponse> replyLookupQueue = new LinkedBlockingQueue<NamingLookupResponse>();
final BlockingQueue<NamingRegisterResponse> replyRegisterQueue = new LinkedBlockingQueue<NamingRegisterResponse>();
final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
@@ -272,4 +303,4 @@ class NamingResponseHandler implements EventHandler<NamingMessage> {
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/32c96457/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
index 38d0894..e27ca74 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
@@ -24,8 +24,6 @@ import org.apache.reef.io.network.naming.exception.NamingException;
import org.apache.reef.io.network.naming.serialization.NamingLookupRequest;
import org.apache.reef.io.network.naming.serialization.NamingLookupResponse;
import org.apache.reef.io.network.naming.serialization.NamingMessage;
-import org.apache.reef.tang.annotations.Name;
-import org.apache.reef.tang.annotations.NamedParameter;
import org.apache.reef.util.cache.Cache;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.Identifier;
@@ -38,9 +36,9 @@ import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
import org.apache.reef.wake.remote.impl.TransportEvent;
import org.apache.reef.wake.remote.transport.Link;
import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.TransportFactory;
import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener;
import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory;
-import org.apache.reef.wake.remote.transport.TransportFactory;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -280,14 +278,6 @@ public class NameLookupClient implements Stage, NamingLookup {
// Should not close transport as we did not
// create it
}
-
- @NamedParameter(doc = "When should a retry timeout(msec)?", short_name = "retryTimeout", default_value = "100")
- public static class RetryTimeout implements Name<Integer> {
- }
-
- @NamedParameter(doc = "How many times should I retry?", short_name = "retryCount", default_value = "10")
- public static class RetryCount implements Name<Integer> {
- }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/32c96457/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameResolver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameResolver.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameResolver.java
new file mode 100644
index 0000000..8036b00
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameResolver.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.naming.Naming;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.wake.Stage;
+
+/**
+ * NameClient interface
+ */
+@DefaultImplementation(NameClient.class)
+public interface NameResolver extends Stage, Naming {
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/32c96457/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameResolverConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameResolverConfiguration.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameResolverConfiguration.java
new file mode 100644
index 0000000..f9ccb3c
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameResolverConfiguration.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.network.naming.parameters.*;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.tang.formats.RequiredParameter;
+import org.apache.reef.wake.IdentifierFactory;
+
+/**
+ * Configuration Module Builder for NameResolver.
+ */
+public final class NameResolverConfiguration extends ConfigurationModuleBuilder {
+
+ /**
+ * The port used by name server.
+ */
+ public static final RequiredParameter<Integer> NAME_SERVICE_PORT = new RequiredParameter<>();
+ /**
+ * DNS hostname running the name service.
+ */
+ public static final RequiredParameter<String> NAME_SERVER_HOSTNAME = new RequiredParameter<>();
+
+ /**
+ * Identifier factory for NameClient.
+ */
+ public static final OptionalParameter<IdentifierFactory> IDENTIFIER_FACTORY = new OptionalParameter<>();
+
+ /**
+ * The timeout of caching lookup.
+ */
+ public static final OptionalParameter<Long> CACHE_TIMEOUT = new OptionalParameter<>();
+
+ /**
+ * The timeout of retrying connection.
+ */
+ public static final OptionalParameter<Integer> RETRY_TIMEOUT = new OptionalParameter<>();
+
+ /**
+ * The number of retrying connection.
+ */
+ public static final OptionalParameter<Integer> RETRY_COUNT = new OptionalParameter<>();
+
+ public static final ConfigurationModule CONF = new NameResolverConfiguration()
+ .bindNamedParameter(NameResolverNameServerPort.class, NAME_SERVICE_PORT)
+ .bindNamedParameter(NameResolverNameServerAddr.class, NAME_SERVER_HOSTNAME)
+ .bindNamedParameter(NameResolverIdentifierFactory.class, IDENTIFIER_FACTORY)
+ .bindNamedParameter(NameResolverCacheTimeout.class, CACHE_TIMEOUT)
+ .bindNamedParameter(NameResolverRetryTimeout.class, RETRY_TIMEOUT)
+ .bindNamedParameter(NameResolverRetryCount.class, RETRY_COUNT)
+ .build();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/32c96457/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverCacheTimeout.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverCacheTimeout.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverCacheTimeout.java
new file mode 100644
index 0000000..731b6d8
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverCacheTimeout.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+
+@NamedParameter(doc = "How much time name lookup client caching(msec)?", default_value = "30000")
+public final class NameResolverCacheTimeout implements Name<Long> {
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/32c96457/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverIdentifierFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverIdentifierFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverIdentifierFactory.java
new file mode 100644
index 0000000..b7998e7
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverIdentifierFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.parameters;
+
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.IdentifierFactory;
+
+
+@NamedParameter(doc = "Identifier factory of NameClient", default_class = StringIdentifierFactory.class)
+public final class NameResolverIdentifierFactory implements Name<IdentifierFactory> {
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/32c96457/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverNameServerAddr.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverNameServerAddr.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverNameServerAddr.java
new file mode 100644
index 0000000..bee90d1
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverNameServerAddr.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "An address of NameServer")
+public final class NameResolverNameServerAddr implements Name<String> {
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/32c96457/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverNameServerPort.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverNameServerPort.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverNameServerPort.java
new file mode 100644
index 0000000..2532d4e
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverNameServerPort.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+
+@NamedParameter(doc = "A port number of a NameServer")
+public final class NameResolverNameServerPort implements Name<Integer> {
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/32c96457/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverRetryCount.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverRetryCount.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverRetryCount.java
new file mode 100644
index 0000000..baa98fd
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverRetryCount.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+
+@NamedParameter(doc = "How many times should I retry?", short_name = "retryCount", default_value = "10")
+public final class NameResolverRetryCount implements Name<Integer> {
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/32c96457/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverRetryTimeout.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverRetryTimeout.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverRetryTimeout.java
new file mode 100644
index 0000000..94d0a0a
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/parameters/NameResolverRetryTimeout.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+
+@NamedParameter(doc = "When should a retry timeout(msec)?", short_name = "retryTimeout", default_value = "100")
+public final class NameResolverRetryTimeout implements Name<Integer> {
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/32c96457/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
index 571b8b4..c46e2be 100644
--- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
@@ -20,7 +20,10 @@ package org.apache.reef.services.network;
import org.apache.reef.io.network.naming.*;
import org.apache.reef.io.network.naming.exception.NamingException;
+import org.apache.reef.io.network.naming.parameters.NameResolverRetryCount;
+import org.apache.reef.io.network.naming.parameters.NameResolverRetryTimeout;
import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.exceptions.InjectionException;
@@ -49,8 +52,8 @@ public class NameClientTest {
static {
Tang tang = Tang.Factory.getTang();
try {
- retryCount = tang.newInjector().getNamedInstance(NameLookupClient.RetryCount.class);
- retryTimeout = tang.newInjector().getNamedInstance(NameLookupClient.RetryTimeout.class);
+ retryCount = tang.newInjector().getNamedInstance(NameResolverRetryCount.class);
+ retryTimeout = tang.newInjector().getNamedInstance(NameResolverRetryTimeout.class);
} catch (InjectionException e1) {
throw new RuntimeException("Exception while trying to find default values for retryCount & Timeout", e1);
}
@@ -85,9 +88,16 @@ public class NameClientTest {
try (final NameServer server = injector.getInstance(NameServer.class)) {
int serverPort = server.getPort();
- try (NameClient client = new NameClient(localAddress, serverPort, factory, retryCount, retryTimeout,
- new NameCache(10000), localAddressProvider)) {
- Identifier id = factory.getNewInstance("Task1");
+ final Configuration nameResolverConf = NameResolverConfiguration.CONF
+ .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
+ .set(NameResolverConfiguration.NAME_SERVICE_PORT, serverPort)
+ .set(NameResolverConfiguration.CACHE_TIMEOUT, 10000)
+ .set(NameResolverConfiguration.RETRY_TIMEOUT, retryTimeout)
+ .set(NameResolverConfiguration.RETRY_COUNT, retryCount)
+ .build();
+
+ try (final NameResolver client = Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameClient.class)) {
+ final Identifier id = factory.getNewInstance("Task1");
client.register(id, new InetSocketAddress(localAddress, 7001));
client.unregister(id);
Thread.sleep(100);
@@ -112,9 +122,16 @@ public class NameClientTest {
try (final NameServer server = injector.getInstance(NameServer.class)) {
int serverPort = server.getPort();
- try (NameClient client = new NameClient(localAddress, serverPort, factory, retryCount, retryTimeout,
- new NameCache(150), localAddressProvider)) {
- Identifier id = factory.getNewInstance("Task1");
+ final Configuration nameResolverConf = NameResolverConfiguration.CONF
+ .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
+ .set(NameResolverConfiguration.NAME_SERVICE_PORT, serverPort)
+ .set(NameResolverConfiguration.CACHE_TIMEOUT, 150)
+ .set(NameResolverConfiguration.RETRY_TIMEOUT, retryTimeout)
+ .set(NameResolverConfiguration.RETRY_COUNT, retryCount)
+ .build();
+
+ try (final NameResolver client = Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameClient.class)) {
+ final Identifier id = factory.getNewInstance("Task1");
client.register(id, new InetSocketAddress(localAddress, 7001));
client.lookup(id);// caches the entry
client.unregister(id);
@@ -138,4 +155,4 @@ public class NameClientTest {
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/32c96457/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
index ef6f30a..462390e 100644
--- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
@@ -20,7 +20,10 @@ package org.apache.reef.services.network;
import org.apache.reef.io.naming.NameAssignment;
import org.apache.reef.io.network.naming.*;
+import org.apache.reef.io.network.naming.parameters.NameResolverRetryCount;
+import org.apache.reef.io.network.naming.parameters.NameResolverRetryTimeout;
import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.exceptions.InjectionException;
@@ -53,8 +56,8 @@ public class NamingTest {
static {
try {
final Injector injector = Tang.Factory.getTang().newInjector();
- retryCount = injector.getNamedInstance(NameLookupClient.RetryCount.class);
- retryTimeout = injector.getNamedInstance(NameLookupClient.RetryTimeout.class);
+ retryCount = injector.getNamedInstance(NameResolverRetryCount.class);
+ retryTimeout = injector.getNamedInstance(NameResolverRetryTimeout.class);
} catch (final InjectionException ex) {
final String msg = "Exception while trying to find default values for retryCount & Timeout";
LOG.log(Level.SEVERE, msg, ex);
@@ -308,8 +311,15 @@ public class NamingTest {
// registration
// invoke registration from the client side
- final NameClient client = new NameClient(localAddress, this.port,
- this.factory, retryCount, retryTimeout, new NameCache(this.TTL), this.localAddressProvider);
+ Configuration nameResolverConf = NameResolverConfiguration.CONF
+ .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
+ .set(NameResolverConfiguration.NAME_SERVICE_PORT, this.port)
+ .set(NameResolverConfiguration.CACHE_TIMEOUT, this.TTL)
+ .set(NameResolverConfiguration.RETRY_TIMEOUT, retryTimeout)
+ .set(NameResolverConfiguration.RETRY_COUNT, retryCount)
+ .build();
+
+ final NameResolver client = Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameClient.class);
for (final Identifier id : idToAddrMap.keySet()) {
client.register(id, idToAddrMap.get(id));
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/32c96457/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
index 3c5f0b8..c1f76b8 100644
--- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
@@ -22,11 +22,11 @@ import org.apache.reef.exception.evaluator.NetworkException;
import org.apache.reef.io.network.Connection;
import org.apache.reef.io.network.Message;
import org.apache.reef.io.network.impl.NetworkService;
-import org.apache.reef.io.network.naming.NameServer;
-import org.apache.reef.io.network.naming.NameServerParameters;
+import org.apache.reef.io.network.naming.*;
import org.apache.reef.io.network.util.StringIdentifierFactory;
import org.apache.reef.services.network.util.Monitor;
import org.apache.reef.services.network.util.StringCodec;
+import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.exceptions.InjectionException;
@@ -84,8 +84,17 @@ public class NetworkServiceTest {
LOG.log(Level.FINEST, "=== Test network service receiver start");
// network service
final String name2 = "task2";
+ final Configuration nameResolverConf = Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+ .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, this.localAddress)
+ .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
+ .build())
+ .build();
+
+ final Injector injector2 = Tang.Factory.getTang().newInjector(nameResolverConf);
+ final NameResolver nameResolver = injector2.getInstance(NameResolver.class);
+
NetworkService<String> ns2 = new NetworkService<String>(
- factory, 0, this.localAddress, nameServerPort,
+ factory, 0, nameResolver,
new StringCodec(), new MessagingTransportFactory(localAddressProvider),
new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler(), localAddressProvider);
ns2.registerId(factory.getNewInstance(name2));
@@ -94,7 +103,7 @@ public class NetworkServiceTest {
LOG.log(Level.FINEST, "=== Test network service sender start");
final String name1 = "task1";
- final NetworkService<String> ns1 = new NetworkService<String>(factory, 0, this.localAddress, nameServerPort,
+ final NetworkService<String> ns1 = new NetworkService<String>(factory, 0, nameResolver,
new StringCodec(), new MessagingTransportFactory(localAddressProvider),
new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), localAddressProvider);
ns1.registerId(factory.getNewInstance(name1));
@@ -145,8 +154,17 @@ public class NetworkServiceTest {
LOG.log(Level.FINEST, "=== Test network service receiver start");
// network service
final String name2 = "task2";
+ final Configuration nameResolverConf = Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+ .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, this.localAddress)
+ .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
+ .build())
+ .build();
+
+ final Injector injector2 = Tang.Factory.getTang().newInjector(nameResolverConf);
+ final NameResolver nameResolver = injector2.getInstance(NameResolver.class);
+
NetworkService<String> ns2 = new NetworkService<String>(
- factory, 0, this.localAddress, nameServerPort,
+ factory, 0, nameResolver,
new StringCodec(), new MessagingTransportFactory(localAddressProvider),
new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler(), localAddressProvider);
ns2.registerId(factory.getNewInstance(name2));
@@ -156,7 +174,7 @@ public class NetworkServiceTest {
LOG.log(Level.FINEST, "=== Test network service sender start");
final String name1 = "task1";
NetworkService<String> ns1 = new NetworkService<String>(
- factory, 0, this.localAddress, nameServerPort,
+ factory, 0, nameResolver,
new StringCodec(), new MessagingTransportFactory(localAddressProvider),
new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), localAddressProvider);
ns1.registerId(factory.getNewInstance(name1));
@@ -229,8 +247,16 @@ public class NetworkServiceTest {
LOG.log(Level.FINEST, "=== Test network service receiver start");
// network service
final String name2 = "task2-" + tt;
+ final Configuration nameResolverConf = Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+ .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
+ .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
+ .build())
+ .build();
+
+ final Injector injector = Tang.Factory.getTang().newInjector(nameResolverConf);
+ final NameResolver nameResolver = injector.getInstance(NameResolver.class);
NetworkService<String> ns2 = new NetworkService<String>(
- factory, 0, localAddress, nameServerPort,
+ factory, 0, nameResolver,
new StringCodec(), new MessagingTransportFactory(localAddressProvider),
new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler(), localAddressProvider);
ns2.registerId(factory.getNewInstance(name2));
@@ -240,7 +266,7 @@ public class NetworkServiceTest {
LOG.log(Level.FINEST, "=== Test network service sender start");
final String name1 = "task1-" + tt;
NetworkService<String> ns1 = new NetworkService<String>(
- factory, 0, localAddress, nameServerPort,
+ factory, 0, nameResolver,
new StringCodec(), new MessagingTransportFactory(localAddressProvider),
new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), localAddressProvider);
ns1.registerId(factory.getNewInstance(name1));
@@ -316,8 +342,16 @@ public class NetworkServiceTest {
LOG.log(Level.FINEST, "=== Test network service receiver start");
// network service
final String name2 = "task2";
+ final Configuration nameResolverConf = Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+ .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, this.localAddress)
+ .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
+ .build())
+ .build();
+
+ final Injector injector2 = Tang.Factory.getTang().newInjector(nameResolverConf);
+ final NameResolver nameResolver = injector2.getInstance(NameResolver.class);
NetworkService<String> ns2 = new NetworkService<String>(
- factory, 0, this.localAddress, nameServerPort,
+ factory, 0, nameResolver,
new StringCodec(), new MessagingTransportFactory(localAddressProvider),
new MessageHandler<String>(name2, monitor, totalNumMessages), new ExceptionHandler(), localAddressProvider);
ns2.registerId(factory.getNewInstance(name2));
@@ -327,7 +361,7 @@ public class NetworkServiceTest {
LOG.log(Level.FINEST, "=== Test network service sender start");
final String name1 = "task1";
NetworkService<String> ns1 = new NetworkService<String>(
- factory, 0, this.localAddress, nameServerPort,
+ factory, 0, nameResolver,
new StringCodec(), new MessagingTransportFactory(localAddressProvider),
new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), localAddressProvider);
ns1.registerId(factory.getNewInstance(name1));
@@ -407,8 +441,16 @@ public class NetworkServiceTest {
LOG.log(Level.FINEST, "=== Test network service receiver start");
// network service
final String name2 = "task2";
+ final Configuration nameResolverConf = Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+ .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, this.localAddress)
+ .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
+ .build())
+ .build();
+
+ final Injector injector2 = Tang.Factory.getTang().newInjector(nameResolverConf);
+ final NameResolver nameResolver = injector2.getInstance(NameResolver.class);
NetworkService<String> ns2 = new NetworkService<String>(
- factory, 0, this.localAddress, nameServerPort,
+ factory, 0, nameResolver,
new StringCodec(), new MessagingTransportFactory(localAddressProvider),
new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler(), localAddressProvider);
ns2.registerId(factory.getNewInstance(name2));
@@ -418,7 +460,7 @@ public class NetworkServiceTest {
LOG.log(Level.FINEST, "=== Test network service sender start");
final String name1 = "task1";
NetworkService<String> ns1 = new NetworkService<String>(
- factory, 0, this.localAddress, nameServerPort,
+ factory, 0, nameResolver,
new StringCodec(), new MessagingTransportFactory(localAddressProvider),
new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), localAddressProvider);
ns1.registerId(factory.getNewInstance(name1));
@@ -506,4 +548,4 @@ public class NetworkServiceTest {
System.err.println(error);
}
}
-}
\ No newline at end of file
+}