You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/12/14 16:11:42 UTC
reef git commit: [REEF-991] Remove RangeTcpPortProvider.Default and
DefaultRemoteManagerImplementation constructor
Repository: reef
Updated Branches:
refs/heads/master e5a11f3bd -> ccecdd577
[REEF-991] Remove RangeTcpPortProvider.Default and DefaultRemoteManagerImplementation constructor
This PR resolves the followings:
* `RangeTcpPortProvider.Default` and its usage
* `DefaultRemoteManagerImplementation` constructor and its usage
* Make DefaultRemoteManagerImplementation class as `final`
JIRA:
[REEF-991](https://issues.apache.org/jira/browse/REEF-991)
Pull request:
This closes #666
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/ccecdd57
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/ccecdd57
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/ccecdd57
Branch: refs/heads/master
Commit: ccecdd5778829d118c3d29f9b75c62e68f1260bf
Parents: e5a11f3
Author: Dongjoon Hyun <do...@apache.org>
Authored: Fri Nov 20 20:01:54 2015 +0900
Committer: Markus Weimer <we...@apache.org>
Committed: Mon Dec 14 07:09:57 2015 -0800
----------------------------------------------------------------------
.../remote/DefaultRemoteManagerFactory.java | 192 +++++++++++++++++++
.../reef/wake/remote/RemoteManagerFactory.java | 1 -
.../impl/DefaultRemoteManagerFactory.java | 163 ----------------
.../DefaultRemoteManagerImplementation.java | 25 +--
.../wake/remote/ports/RangeTcpPortProvider.java | 10 -
.../netty/MessagingTransportFactory.java | 38 +++-
.../wake/test/remote/RemoteManagerTest.java | 19 +-
7 files changed, 238 insertions(+), 210 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/ccecdd57/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultRemoteManagerFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultRemoteManagerFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultRemoteManagerFactory.java
new file mode 100644
index 0000000..442d4fa
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultRemoteManagerFactory.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.remote;
+
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
+import org.apache.reef.wake.remote.transport.TransportFactory;
+
+import javax.inject.Inject;
+
+/**
+ * Default implementation of RemoteManagerFactory.
+ */
+final class DefaultRemoteManagerFactory implements RemoteManagerFactory {
+
+ private final Injector injector;
+
+ private final Codec<?> codec;
+ private final EventHandler<Throwable> errorHandler;
+ private final boolean orderingGuarantee;
+ private final int numberOfTries;
+ private final int retryTimeout;
+ private final LocalAddressProvider localAddressProvider;
+ private final TransportFactory transportFactory;
+ private final TcpPortProvider tcpPortProvider;
+
+ @Inject
+ private DefaultRemoteManagerFactory(
+ @Parameter(RemoteConfiguration.MessageCodec.class) final Codec<?> codec,
+ @Parameter(RemoteConfiguration.ErrorHandler.class) final EventHandler<Throwable> errorHandler,
+ @Parameter(RemoteConfiguration.OrderingGuarantee.class) final boolean orderingGuarantee,
+ @Parameter(RemoteConfiguration.NumberOfTries.class) final int numberOfTries,
+ @Parameter(RemoteConfiguration.RetryTimeout.class) final int retryTimeout,
+ final LocalAddressProvider localAddressProvider,
+ final TransportFactory tpFactory,
+ final TcpPortProvider tcpPortProvider,
+ final Injector injector) {
+ this.injector = injector.forkInjector();
+ this.codec = codec;
+ this.errorHandler = errorHandler;
+ this.orderingGuarantee = orderingGuarantee;
+ this.numberOfTries = numberOfTries;
+ this.retryTimeout = retryTimeout;
+ this.localAddressProvider = localAddressProvider;
+ this.transportFactory = tpFactory;
+ this.tcpPortProvider = tcpPortProvider;
+ }
+
+ @Override
+ public RemoteManager getInstance(final String name) {
+ try {
+ final Injector newInjector = injector.forkInjector();
+ newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, name);
+ newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, this.codec);
+ newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, this.errorHandler);
+ newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, this.orderingGuarantee);
+ newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, this.numberOfTries);
+ newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, this.retryTimeout);
+ newInjector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider);
+ newInjector.bindVolatileInstance(TransportFactory.class, this.transportFactory);
+ newInjector.bindVolatileInstance(TcpPortProvider.class, this.tcpPortProvider);
+ return newInjector.getInstance(RemoteManager.class);
+ } catch (InjectionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:hiddenfield")
+ public <T> RemoteManager getInstance(final String name,
+ final String hostAddress,
+ final int listeningPort,
+ final Codec<T> codec,
+ final EventHandler<Throwable> errorHandler,
+ final boolean orderingGuarantee,
+ final int numberOfTries,
+ final int retryTimeout,
+ final LocalAddressProvider localAddressProvider,
+ final TcpPortProvider tcpPortProvider) {
+ try {
+ final Injector newInjector = injector.forkInjector();
+ newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, name);
+ newInjector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, hostAddress);
+ newInjector.bindVolatileParameter(RemoteConfiguration.Port.class, listeningPort);
+ newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, codec);
+ newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, errorHandler);
+ newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, orderingGuarantee);
+ newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, numberOfTries);
+ newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, retryTimeout);
+ newInjector.bindVolatileInstance(LocalAddressProvider.class, localAddressProvider);
+ newInjector.bindVolatileInstance(TransportFactory.class, this.transportFactory);
+ newInjector.bindVolatileInstance(TcpPortProvider.class, tcpPortProvider);
+ return newInjector.getInstance(RemoteManager.class);
+ } catch (InjectionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:hiddenfield")
+ public <T> RemoteManager getInstance(final String name,
+ final String hostAddress,
+ final int listeningPort,
+ final Codec<T> codec,
+ final EventHandler<Throwable> errorHandler,
+ final boolean orderingGuarantee,
+ final int numberOfTries,
+ final int retryTimeout) {
+ try {
+ final Injector newInjector = injector.forkInjector();
+ newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, name);
+ newInjector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, hostAddress);
+ newInjector.bindVolatileParameter(RemoteConfiguration.Port.class, listeningPort);
+ newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, codec);
+ newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, errorHandler);
+ newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, orderingGuarantee);
+ newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, numberOfTries);
+ newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, retryTimeout);
+ newInjector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider);
+ newInjector.bindVolatileInstance(TransportFactory.class, this.transportFactory);
+ newInjector.bindVolatileInstance(TcpPortProvider.class, this.tcpPortProvider);
+ return newInjector.getInstance(RemoteManager.class);
+ } catch (InjectionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:hiddenfield")
+ public <T> RemoteManager getInstance(
+ final String name, final Codec<T> codec, final EventHandler<Throwable> errorHandler) {
+ try {
+ final Injector newInjector = injector.forkInjector();
+ newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, name);
+ newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, codec);
+ newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, errorHandler);
+ newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, this.orderingGuarantee);
+ newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, this.numberOfTries);
+ newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, this.retryTimeout);
+ newInjector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider);
+ newInjector.bindVolatileInstance(TransportFactory.class, this.transportFactory);
+ newInjector.bindVolatileInstance(TcpPortProvider.class, this.tcpPortProvider);
+ return newInjector.getInstance(RemoteManager.class);
+ } catch (InjectionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:hiddenfield")
+ public <T> RemoteManager getInstance(final String name,
+ final int listeningPort,
+ final Codec<T> codec,
+ final EventHandler<Throwable> errorHandler) {
+ try {
+ final Injector newInjector = injector.forkInjector();
+ newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, name);
+ newInjector.bindVolatileParameter(RemoteConfiguration.Port.class, listeningPort);
+ newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, codec);
+ newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, errorHandler);
+ newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, this.orderingGuarantee);
+ newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, this.numberOfTries);
+ newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, this.retryTimeout);
+ newInjector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider);
+ newInjector.bindVolatileInstance(TransportFactory.class, this.transportFactory);
+ newInjector.bindVolatileInstance(TcpPortProvider.class, this.tcpPortProvider);
+ return newInjector.getInstance(RemoteManager.class);
+ } catch (InjectionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/ccecdd57/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java
index b27f7bc..04c928d 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java
@@ -21,7 +21,6 @@ package org.apache.reef.wake.remote;
import org.apache.reef.tang.annotations.DefaultImplementation;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.remote.address.LocalAddressProvider;
-import org.apache.reef.wake.remote.impl.DefaultRemoteManagerFactory;
import org.apache.reef.wake.remote.ports.TcpPortProvider;
/**
http://git-wip-us.apache.org/repos/asf/reef/blob/ccecdd57/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java
deleted file mode 100644
index bb8ff2d..0000000
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.wake.remote.impl;
-
-import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.remote.Codec;
-import org.apache.reef.wake.remote.RemoteConfiguration;
-import org.apache.reef.wake.remote.RemoteManager;
-import org.apache.reef.wake.remote.RemoteManagerFactory;
-import org.apache.reef.wake.remote.address.LocalAddressProvider;
-import org.apache.reef.wake.remote.ports.TcpPortProvider;
-import org.apache.reef.wake.remote.transport.TransportFactory;
-
-import javax.inject.Inject;
-
-/**
- * Default implementation of RemoteManagerFactory.
- */
-public final class DefaultRemoteManagerFactory implements RemoteManagerFactory {
-
- private final Codec<?> codec;
- private final EventHandler<Throwable> errorHandler;
- private final boolean orderingGuarantee;
- private final int numberOfTries;
- private final int retryTimeout;
- private final LocalAddressProvider localAddressProvider;
- private final TransportFactory tpFactory;
-
- @Inject
- private DefaultRemoteManagerFactory(
- @Parameter(RemoteConfiguration.MessageCodec.class) final Codec<?> codec,
- @Parameter(RemoteConfiguration.ErrorHandler.class) final EventHandler<Throwable> errorHandler,
- @Parameter(RemoteConfiguration.OrderingGuarantee.class) final boolean orderingGuarantee,
- @Parameter(RemoteConfiguration.NumberOfTries.class) final int numberOfTries,
- @Parameter(RemoteConfiguration.RetryTimeout.class) final int retryTimeout,
- final LocalAddressProvider localAddressProvider,
- final TransportFactory tpFactory) {
- this.codec = codec;
- this.errorHandler = errorHandler;
- this.orderingGuarantee = orderingGuarantee;
- this.numberOfTries = numberOfTries;
- this.retryTimeout = retryTimeout;
- this.localAddressProvider = localAddressProvider;
- this.tpFactory = tpFactory;
- }
-
- // TODO[REEF-547]: This method uses deprecated DefaultRemoteManagerImplementation constructor.
- @Override
- public RemoteManager getInstance(final String name) {
- return new DefaultRemoteManagerImplementation(name,
- DefaultRemoteManagerImplementation.UNKNOWN_HOST_NAME, // Indicate to use the localAddressProvider
- 0, // Indicate to use the tcpPortProvider
- this.codec,
- this.errorHandler,
- this.orderingGuarantee,
- this.numberOfTries,
- this.retryTimeout,
- this.localAddressProvider,
- this.tpFactory);
- }
-
- // TODO[REEF-547]: This method uses deprecated DefaultRemoteManagerImplementation constructor.
- @Override
- @SuppressWarnings("checkstyle:hiddenfield")
- public <T> RemoteManager getInstance(final String name,
- final String hostAddress,
- final int listeningPort,
- final Codec<T> codec,
- final EventHandler<Throwable> errorHandler,
- final boolean orderingGuarantee,
- final int numberOfTries,
- final int retryTimeout,
- final LocalAddressProvider localAddressProvider,
- final TcpPortProvider tcpPortProvider) {
- return new DefaultRemoteManagerImplementation(name,
- hostAddress,
- listeningPort,
- codec,
- errorHandler,
- orderingGuarantee,
- numberOfTries,
- retryTimeout,
- localAddressProvider,
- tpFactory);
- }
-
- // TODO[REEF-547]: This method uses deprecated DefaultRemoteManagerImplementation constructor.
- @Override
- @SuppressWarnings("checkstyle:hiddenfield")
- public <T> RemoteManager getInstance(final String name,
- final String hostAddress,
- final int listeningPort,
- final Codec<T> codec,
- final EventHandler<Throwable> errorHandler,
- final boolean orderingGuarantee,
- final int numberOfTries,
- final int retryTimeout) {
- return new DefaultRemoteManagerImplementation(name,
- hostAddress,
- listeningPort,
- codec,
- errorHandler,
- orderingGuarantee,
- numberOfTries,
- retryTimeout,
- this.localAddressProvider,
- this.tpFactory);
-
- }
-
- // TODO[REEF-547]: This method uses deprecated DefaultRemoteManagerImplementation constructor.
- @Override
- @SuppressWarnings("checkstyle:hiddenfield")
- public <T> RemoteManager getInstance(
- final String name, final Codec<T> codec, final EventHandler<Throwable> errorHandler) {
- return new DefaultRemoteManagerImplementation(name,
- DefaultRemoteManagerImplementation.UNKNOWN_HOST_NAME, // Indicate to use the localAddressProvider
- 0, // Indicate to use the tcpPortProvider,
- codec,
- errorHandler,
- this.orderingGuarantee,
- this.numberOfTries,
- this.retryTimeout,
- this.localAddressProvider,
- this.tpFactory);
- }
-
- // TODO[REEF-547]: This method uses deprecated DefaultRemoteManagerImplementation constructor.
- @Override
- @SuppressWarnings("checkstyle:hiddenfield")
- public <T> RemoteManager getInstance(final String name,
- final int listeningPort,
- final Codec<T> codec,
- final EventHandler<Throwable> errorHandler) {
- return new DefaultRemoteManagerImplementation(name,
- DefaultRemoteManagerImplementation.UNKNOWN_HOST_NAME, // Indicate to use the localAddressProvider
- listeningPort,
- codec,
- errorHandler,
- this.orderingGuarantee,
- this.numberOfTries,
- this.retryTimeout,
- this.localAddressProvider,
- this.tpFactory);
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/ccecdd57/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
index 3aee204..3e972fe 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
@@ -24,7 +24,6 @@ import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.impl.StageManager;
import org.apache.reef.wake.remote.*;
import org.apache.reef.wake.remote.address.LocalAddressProvider;
-import org.apache.reef.wake.remote.ports.RangeTcpPortProvider;
import org.apache.reef.wake.remote.ports.TcpPortProvider;
import org.apache.reef.wake.remote.transport.Transport;
import org.apache.reef.wake.remote.transport.TransportFactory;
@@ -43,7 +42,7 @@ import java.util.logging.Logger;
/**
* Default remote manager implementation.
*/
-public class DefaultRemoteManagerImplementation implements RemoteManager {
+public final class DefaultRemoteManagerImplementation implements RemoteManager {
private static final Logger LOG = Logger.getLogger(HandlerContainer.class.getName());
@@ -66,28 +65,8 @@ public class DefaultRemoteManagerImplementation implements RemoteManager {
*/
public static final String UNKNOWN_HOST_NAME = NettyMessagingTransport.UNKNOWN_HOST_NAME;
- /**
- * @deprecated have an instance injected instead.
- */
- @Deprecated
- @Inject
- public <T> DefaultRemoteManagerImplementation(
- @Parameter(RemoteConfiguration.ManagerName.class) final String name,
- @Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress,
- @Parameter(RemoteConfiguration.Port.class) final int listeningPort,
- @Parameter(RemoteConfiguration.MessageCodec.class) final Codec<T> codec,
- @Parameter(RemoteConfiguration.ErrorHandler.class) final EventHandler<Throwable> errorHandler,
- @Parameter(RemoteConfiguration.OrderingGuarantee.class) final boolean orderingGuarantee,
- @Parameter(RemoteConfiguration.NumberOfTries.class) final int numberOfTries,
- @Parameter(RemoteConfiguration.RetryTimeout.class) final int retryTimeout,
- final LocalAddressProvider localAddressProvider,
- final TransportFactory tpFactory) {
- this(name, hostAddress, listeningPort, codec, errorHandler, orderingGuarantee, numberOfTries, retryTimeout,
- localAddressProvider, tpFactory, RangeTcpPortProvider.Default);
- }
-
@Inject
- private <T> DefaultRemoteManagerImplementation(
+ private <T> DefaultRemoteManagerImplementation(
@Parameter(RemoteConfiguration.ManagerName.class) final String name,
@Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress,
@Parameter(RemoteConfiguration.Port.class) final int listeningPort,
http://git-wip-us.apache.org/repos/asf/reef/blob/ccecdd57/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java
index a5669be..fe959a2 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java
@@ -58,16 +58,6 @@ public final class RangeTcpPortProvider implements TcpPortProvider {
return new RandomRangeIterator(portRangeBegin, portRangeCount, portRangeTryCount);
}
- /**
- * @deprecated in 0.12 have an instance injected instead.
- */
- @Deprecated
- @SuppressWarnings("checkstyle:constantname")
- public static final RangeTcpPortProvider Default = new RangeTcpPortProvider(
- Integer.parseInt(TcpPortRangeBegin.DEFAULT_VALUE),
- Integer.parseInt(TcpPortRangeCount.DEFAULT_VALUE),
- Integer.parseInt(TcpPortRangeTryCount.DEFAULT_VALUE));
-
@Override
public String toString() {
return "RangeTcpPortProvider{" +
http://git-wip-us.apache.org/repos/asf/reef/blob/ccecdd57/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java
index efc67de..0409553 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java
@@ -27,7 +27,6 @@ import org.apache.reef.wake.impl.SyncStage;
import org.apache.reef.wake.remote.RemoteConfiguration;
import org.apache.reef.wake.remote.address.LocalAddressProvider;
import org.apache.reef.wake.remote.impl.TransportEvent;
-import org.apache.reef.wake.remote.ports.RangeTcpPortProvider;
import org.apache.reef.wake.remote.ports.TcpPortProvider;
import org.apache.reef.wake.remote.transport.Transport;
import org.apache.reef.wake.remote.transport.TransportFactory;
@@ -81,19 +80,46 @@ public class MessagingTransportFactory implements TransportFactory {
}
}
- // TODO[REEF-547]: This method uses deprecated RangeTcpPortProvider.Default. Must remove usages and deprecate.
+ /**
+ * Creates a transport.
+ *
+ * @param hostAddress a host address
+ * @param port a listening port
+ * @param clientStage a client stage
+ * @param serverStage a server stage
+ * @param numberOfTries a number of tries
+ * @param retryTimeout a timeout for retry
+ */
@Override
- public Transport newInstance(final String hostAddress, final int port,
+ public Transport newInstance(final String hostAddress,
+ final int port,
final EStage<TransportEvent> clientStage,
final EStage<TransportEvent> serverStage,
final int numberOfTries,
final int retryTimeout) {
- return newInstance(hostAddress, port, clientStage,
- serverStage, numberOfTries, retryTimeout, RangeTcpPortProvider.Default);
+ try {
+ TcpPortProvider tcpPortProvider = Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class);
+ return newInstance(hostAddress, port, clientStage,
+ serverStage, numberOfTries, retryTimeout, tcpPortProvider);
+ } catch (final InjectionException e) {
+ throw new RuntimeException(e);
+ }
}
+ /**
+ * Creates a transport.
+ *
+ * @param hostAddress a host address
+ * @param port a listening port
+ * @param clientStage a client stage
+ * @param serverStage a server stage
+ * @param numberOfTries a number of tries
+ * @param retryTimeout a timeout for retry
+ * @param tcpPortProvider a provider for TCP port
+ */
@Override
- public Transport newInstance(final String hostAddress, final int port,
+ public Transport newInstance(final String hostAddress,
+ final int port,
final EStage<TransportEvent> clientStage,
final EStage<TransportEvent> serverStage,
final int numberOfTries,
http://git-wip-us.apache.org/repos/asf/reef/blob/ccecdd57/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java
index a46a54e..196d7d6 100644
--- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java
+++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java
@@ -30,7 +30,7 @@ import org.apache.reef.wake.remote.address.LocalAddressProvider;
import org.apache.reef.wake.remote.impl.DefaultRemoteIdentifierFactoryImplementation;
import org.apache.reef.wake.remote.impl.MultiCodec;
import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
-import org.apache.reef.wake.remote.ports.RangeTcpPortProvider;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
import org.apache.reef.wake.test.util.Monitor;
import org.apache.reef.wake.test.util.TimeoutHandler;
import org.junit.Assert;
@@ -88,7 +88,7 @@ public class RemoteManagerTest {
final RemoteManager rm = this.remoteManagerFactory.getInstance(
"name", hostAddress, PORT, codec, new LoggingEventHandler<Throwable>(), false, 3, 10000,
- localAddressProvider, RangeTcpPortProvider.Default);
+ localAddressProvider, Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class));
final RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation();
final RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + PORT);
@@ -192,7 +192,7 @@ public class RemoteManagerTest {
final RemoteManager rm = this.remoteManagerFactory.getInstance(
"name", hostAddress, PORT, codec, new LoggingEventHandler<Throwable>(), true, 3, 10000,
- localAddressProvider, RangeTcpPortProvider.Default);
+ localAddressProvider, Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class));
final RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation();
final RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + PORT);
@@ -236,7 +236,7 @@ public class RemoteManagerTest {
final RemoteManager rm = this.remoteManagerFactory.getInstance(
"name", hostAddress, PORT, codec, new LoggingEventHandler<Throwable>(), false, 3, 10000,
- localAddressProvider, RangeTcpPortProvider.Default);
+ localAddressProvider, Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class));
final RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation();
final RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + PORT);
@@ -301,9 +301,14 @@ public class RemoteManagerTest {
final Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap);
final String hostAddress = localAddressProvider.getLocalAddress();
- return remoteManagerFactory.getInstance(rmName, hostAddress, localPort,
- codec, new LoggingEventHandler<Throwable>(), false, retry, retryTimeout,
- localAddressProvider, RangeTcpPortProvider.Default);
+ try {
+ TcpPortProvider tcpPortProvider = Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class);
+ return remoteManagerFactory.getInstance(rmName, hostAddress, localPort,
+ codec, new LoggingEventHandler<Throwable>(), false, retry, retryTimeout,
+ localAddressProvider, tcpPortProvider);
+ } catch (final InjectionException e) {
+ throw new RuntimeException(e);
+ }
}
private class SendingRemoteManagerThread implements Callable<Integer> {