You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ch...@apache.org on 2015/07/28 07:15:51 UTC
[03/12] incubator-reef git commit: [REEF-513] Add FinalParameters to
checkstyle
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LegacyLocalAddressProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LegacyLocalAddressProvider.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LegacyLocalAddressProvider.java
index 902722d..00a38e6 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LegacyLocalAddressProvider.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LegacyLocalAddressProvider.java
@@ -53,21 +53,21 @@ public final class LegacyLocalAddressProvider implements LocalAddressProvider {
public String getLocalAddress() {
// This method is surprisingly slow: It was causing unit test timeouts, so we memoize the result.
if (cached.get() == null) {
- Enumeration<NetworkInterface> ifaces;
+ final Enumeration<NetworkInterface> ifaces;
try {
ifaces = NetworkInterface.getNetworkInterfaces();
- TreeSet<Inet4Address> sortedAddrs = new TreeSet<>(new AddressComparator());
+ final TreeSet<Inet4Address> sortedAddrs = new TreeSet<>(new AddressComparator());
// There is an idea of virtual / subinterfaces exposed by java here.
// We're not walking around looking for those because the javadoc says:
// "NOTE: can use getNetworkInterfaces()+getInetAddresses() to obtain all IP addresses for this node"
while (ifaces.hasMoreElements()) {
- NetworkInterface iface = ifaces.nextElement();
+ final NetworkInterface iface = ifaces.nextElement();
// if(iface.isUp()) { // leads to slowness and non-deterministic return values, so don't call isUp().
- Enumeration<InetAddress> addrs = iface.getInetAddresses();
+ final Enumeration<InetAddress> addrs = iface.getInetAddresses();
while (addrs.hasMoreElements()) {
- InetAddress a = addrs.nextElement();
+ final InetAddress a = addrs.nextElement();
if (a instanceof Inet4Address) {
sortedAddrs.add((Inet4Address) a);
}
@@ -80,7 +80,7 @@ public final class LegacyLocalAddressProvider implements LocalAddressProvider {
}
cached.set(sortedAddrs.pollFirst().getHostAddress());
LOG.log(Level.FINE, "Local address is {0}", cached.get());
- } catch (SocketException e) {
+ } catch (final SocketException e) {
throw new WakeRuntimeException("Unable to get local host address",
e.getCause());
}
@@ -98,14 +98,14 @@ public final class LegacyLocalAddressProvider implements LocalAddressProvider {
private static class AddressComparator implements Comparator<Inet4Address> {
// get unsigned byte.
- private static int u(byte b) {
+ private static int u(final byte b) {
return ((int) b); // & 0xff;
}
@Override
- public int compare(Inet4Address aa, Inet4Address ba) {
- byte[] a = aa.getAddress();
- byte[] b = ba.getAddress();
+ public int compare(final Inet4Address aa, final Inet4Address ba) {
+ final byte[] a = aa.getAddress();
+ final byte[] b = ba.getAddress();
// local subnet comes after all else.
if (a[0] == 127 && b[0] != 127) {
return 1;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LocalAddressProviderFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LocalAddressProviderFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LocalAddressProviderFactory.java
index e6f4a3c..b6d558a 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LocalAddressProviderFactory.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LocalAddressProviderFactory.java
@@ -61,7 +61,7 @@ public final class LocalAddressProviderFactory {
LOGGER.log(Level.FINER, "Instantiating default LocalAddressProvider for legacy users.");
instance = Tang.Factory.getTang().newInjector().getInstance(LocalAddressProvider.class);
LOGGER.log(Level.FINER, "Instantiated default LocalAddressProvider for legacy users.");
- } catch (InjectionException e) {
+ } catch (final InjectionException e) {
throw new RuntimeException("Unable to instantiate default LocalAddressProvider for legacy users.", e);
}
assert (null != instance);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/exception/RemoteRuntimeException.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/exception/RemoteRuntimeException.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/exception/RemoteRuntimeException.java
index e23ee3e..1516d8e 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/exception/RemoteRuntimeException.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/exception/RemoteRuntimeException.java
@@ -30,7 +30,7 @@ public class RemoteRuntimeException extends RuntimeException {
* @param s the detailed message
* @param e the cause
*/
- public RemoteRuntimeException(String s, Throwable e) {
+ public RemoteRuntimeException(final String s, final Throwable e) {
super(s, e);
}
@@ -39,7 +39,7 @@ public class RemoteRuntimeException extends RuntimeException {
*
* @param s the detailed message
*/
- public RemoteRuntimeException(String s) {
+ public RemoteRuntimeException(final String s) {
super(s);
}
@@ -48,7 +48,7 @@ public class RemoteRuntimeException extends RuntimeException {
*
* @param e the cause
*/
- public RemoteRuntimeException(Throwable e) {
+ public RemoteRuntimeException(final Throwable e) {
super(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ByteCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ByteCodec.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ByteCodec.java
index 5c24dd3..ec107ad 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ByteCodec.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ByteCodec.java
@@ -32,7 +32,7 @@ public class ByteCodec implements Codec<byte[]> {
* @return the same bytes
*/
@Override
- public byte[] encode(byte[] obj) {
+ public byte[] encode(final byte[] obj) {
return obj;
}
@@ -43,7 +43,7 @@ public class ByteCodec implements Codec<byte[]> {
* @return the same bytes
*/
@Override
- public byte[] decode(byte[] buf) {
+ public byte[] decode(final byte[] buf) {
return buf;
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ConnectFutureTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ConnectFutureTask.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ConnectFutureTask.java
index fc6d73c..5e664af 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ConnectFutureTask.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ConnectFutureTask.java
@@ -27,7 +27,7 @@ public class ConnectFutureTask<T> extends FutureTask<T> {
private final EventHandler<ConnectFutureTask<T>> handler;
- public ConnectFutureTask(Callable<T> callable, EventHandler<ConnectFutureTask<T>> handler) {
+ public ConnectFutureTask(final Callable<T> callable, final EventHandler<ConnectFutureTask<T>> handler) {
super(callable);
this.handler = handler;
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteIdentifierFactoryImplementation.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteIdentifierFactoryImplementation.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteIdentifierFactoryImplementation.java
index b403891..ba8f638 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteIdentifierFactoryImplementation.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteIdentifierFactoryImplementation.java
@@ -35,12 +35,13 @@ public class DefaultRemoteIdentifierFactoryImplementation extends DefaultIdentif
}
@SuppressWarnings({"unchecked", "rawtypes"})
- public DefaultRemoteIdentifierFactoryImplementation(Map<String, Class<? extends RemoteIdentifier>> typeToClazzMap) {
+ public DefaultRemoteIdentifierFactoryImplementation(
+ final Map<String, Class<? extends RemoteIdentifier>> typeToClazzMap) {
super((Map<String, Class<? extends Identifier>>) (Map) typeToClazzMap);
}
@Override
- public RemoteIdentifier getNewInstance(String str) {
+ public RemoteIdentifier getNewInstance(final String str) {
return (RemoteIdentifier) super.getNewInstance(str);
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java
index 2d63b34..0aa78c4 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java
@@ -125,7 +125,8 @@ public final class DefaultRemoteManagerFactory implements RemoteManagerFactory {
@Override
@SuppressWarnings("checkstyle:hiddenfield")
- public <T> RemoteManager getInstance(String name, Codec<T> codec, EventHandler<Throwable> errorHandler) {
+ public <T> RemoteManager getInstance(
+ final String name, final Codec<T> codec, final EventHandler<Throwable> errorHandler) {
return new DefaultRemoteManagerImplementation(name,
DefaultRemoteManagerImplementation.UNKNOWN_HOST_NAME, // Indicate to use the localAddressProvider
0, // Indicate to use the tcpPortProvider,
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteMessage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteMessage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteMessage.java
index 3929415..de2f23b 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteMessage.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteMessage.java
@@ -37,7 +37,7 @@ class DefaultRemoteMessage<T> implements RemoteMessage<T> {
* @param id the remote identifier
* @param message the message
*/
- DefaultRemoteMessage(RemoteIdentifier id, T message) {
+ DefaultRemoteMessage(final RemoteIdentifier id, final T message) {
this.id = id;
this.message = message;
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultTransportEStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultTransportEStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultTransportEStage.java
index 7f33847..49b0bac 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultTransportEStage.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultTransportEStage.java
@@ -29,7 +29,7 @@ public class DefaultTransportEStage implements EStage<TransportEvent> {
}
@Override
- public void onNext(TransportEvent value) {
+ public void onNext(final TransportEvent value) {
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiCodec.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiCodec.java
index 1fad3cd..440b241 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiCodec.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiCodec.java
@@ -41,12 +41,12 @@ public class MultiCodec<T> implements Codec<T> {
*
* @param clazzToDecoderMap
*/
- public MultiCodec(Map<Class<? extends T>, Codec<? extends T>> clazzToCodecMap) {
- Map<Class<? extends T>, Encoder<? extends T>> clazzToEncoderMap =
+ public MultiCodec(final Map<Class<? extends T>, Codec<? extends T>> clazzToCodecMap) {
+ final Map<Class<? extends T>, Encoder<? extends T>> clazzToEncoderMap =
new HashMap<Class<? extends T>, Encoder<? extends T>>();
- Map<Class<? extends T>, Decoder<? extends T>> clazzToDecoderMap =
+ final Map<Class<? extends T>, Decoder<? extends T>> clazzToDecoderMap =
new HashMap<Class<? extends T>, Decoder<? extends T>>();
- for (Class<? extends T> clazz : clazzToCodecMap.keySet()) {
+ for (final Class<? extends T> clazz : clazzToCodecMap.keySet()) {
clazzToEncoderMap.put(clazz, clazzToCodecMap.get(clazz));
clazzToDecoderMap.put(clazz, clazzToCodecMap.get(clazz));
}
@@ -60,7 +60,7 @@ public class MultiCodec<T> implements Codec<T> {
* @param obj
*/
@Override
- public byte[] encode(T obj) {
+ public byte[] encode(final T obj) {
return encoder.encode(obj);
}
@@ -70,7 +70,7 @@ public class MultiCodec<T> implements Codec<T> {
* @param data class name and byte payload
*/
@Override
- public T decode(byte[] data) {
+ public T decode(final byte[] data) {
return decoder.decode(data);
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiDecoder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiDecoder.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiDecoder.java
index d757d6a..1df9517 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiDecoder.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiDecoder.java
@@ -39,7 +39,7 @@ public class MultiDecoder<T> implements Decoder<T> {
*
* @param clazzToDecoderMap
*/
- public MultiDecoder(Map<Class<? extends T>, Decoder<? extends T>> clazzToDecoderMap) {
+ public MultiDecoder(final Map<Class<? extends T>, Decoder<? extends T>> clazzToDecoderMap) {
this.clazzToDecoderMap = clazzToDecoderMap;
}
@@ -49,21 +49,21 @@ public class MultiDecoder<T> implements Decoder<T> {
* @param data class name and byte payload
*/
@Override
- public T decode(byte[] data) {
- WakeTuplePBuf tuple;
+ public T decode(final byte[] data) {
+ final WakeTuplePBuf tuple;
try {
tuple = WakeTuplePBuf.parseFrom(data);
- } catch (InvalidProtocolBufferException e) {
+ } catch (final InvalidProtocolBufferException e) {
e.printStackTrace();
throw new RemoteRuntimeException(e);
}
- String className = tuple.getClassName();
- byte[] message = tuple.getData().toByteArray();
- Class<?> clazz;
+ final String className = tuple.getClassName();
+ final byte[] message = tuple.getData().toByteArray();
+ final Class<?> clazz;
try {
clazz = Class.forName(className);
- } catch (ClassNotFoundException e) {
+ } catch (final ClassNotFoundException e) {
e.printStackTrace();
throw new RemoteRuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiEncoder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiEncoder.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiEncoder.java
index 4e3489e..e3e3c46 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiEncoder.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiEncoder.java
@@ -40,7 +40,7 @@ public class MultiEncoder<T> implements Encoder<T> {
*
* @param clazzToEncoderMap
*/
- public MultiEncoder(Map<Class<? extends T>, Encoder<? extends T>> clazzToEncoderMap) {
+ public MultiEncoder(final Map<Class<? extends T>, Encoder<? extends T>> clazzToEncoderMap) {
this.clazzToEncoderMap = clazzToEncoderMap;
}
@@ -50,13 +50,13 @@ public class MultiEncoder<T> implements Encoder<T> {
* @param obj
*/
@Override
- public byte[] encode(T obj) {
- Encoder<T> encoder = (Encoder<T>) clazzToEncoderMap.get(obj.getClass());
+ public byte[] encode(final T obj) {
+ final Encoder<T> encoder = (Encoder<T>) clazzToEncoderMap.get(obj.getClass());
if (encoder == null) {
throw new RemoteRuntimeException("Encoder for " + obj.getClass() + " not known.");
}
- WakeTuplePBuf.Builder tupleBuilder = WakeTuplePBuf.newBuilder();
+ final WakeTuplePBuf.Builder tupleBuilder = WakeTuplePBuf.newBuilder();
tupleBuilder.setClassName(obj.getClass().getName());
tupleBuilder.setData(ByteString.copyFrom(encoder.encode(obj)));
return tupleBuilder.build().toByteArray();
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java
index d5302f7..d695867 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java
@@ -52,7 +52,8 @@ public class OrderedRemoteReceiverStage implements EStage<TransportEvent> {
* @param handler the handler of remote events
* @param errorHandler the exception handler
*/
- public OrderedRemoteReceiverStage(EventHandler<RemoteEvent<byte[]>> handler, EventHandler<Throwable> errorHandler) {
+ public OrderedRemoteReceiverStage(
+ final EventHandler<RemoteEvent<byte[]>> handler, final EventHandler<Throwable> errorHandler) {
this.streamMap = new ConcurrentHashMap<SocketAddress, OrderedEventStream>();
this.pushExecutor = Executors.newCachedThreadPool(
@@ -67,7 +68,7 @@ public class OrderedRemoteReceiverStage implements EStage<TransportEvent> {
}
@Override
- public void onNext(TransportEvent value) {
+ public void onNext(final TransportEvent value) {
LOG.log(Level.FINEST, "{0}", value);
pushStage.onNext(value);
}
@@ -82,10 +83,10 @@ public class OrderedRemoteReceiverStage implements EStage<TransportEvent> {
// wait for threads to finish for timeout
if (!pushExecutor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms.");
- List<Runnable> droppedRunnables = pushExecutor.shutdownNow();
+ final List<Runnable> droppedRunnables = pushExecutor.shutdownNow();
LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() + " tasks.");
}
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
LOG.log(Level.WARNING, "Close interrupted");
throw new RemoteRuntimeException(e);
}
@@ -97,10 +98,10 @@ public class OrderedRemoteReceiverStage implements EStage<TransportEvent> {
// wait for threads to finish for timeout
if (!pullExecutor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms.");
- List<Runnable> droppedRunnables = pullExecutor.shutdownNow();
+ final List<Runnable> droppedRunnables = pullExecutor.shutdownNow();
LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() + " tasks.");
}
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
LOG.log(Level.WARNING, "Close interrupted");
throw new RemoteRuntimeException(e);
}
@@ -116,16 +117,16 @@ class OrderedPushEventHandler implements EventHandler<TransportEvent> {
private final ConcurrentMap<SocketAddress, OrderedEventStream> streamMap; // per remote address
private final ThreadPoolStage<OrderedEventStream> pullStage;
- OrderedPushEventHandler(ConcurrentMap<SocketAddress, OrderedEventStream> streamMap,
- ThreadPoolStage<OrderedEventStream> pullStage) {
+ OrderedPushEventHandler(final ConcurrentMap<SocketAddress, OrderedEventStream> streamMap,
+ final ThreadPoolStage<OrderedEventStream> pullStage) {
this.codec = new RemoteEventCodec<byte[]>(new ByteCodec());
this.streamMap = streamMap;
this.pullStage = pullStage;
}
@Override
- public void onNext(TransportEvent value) {
- RemoteEvent<byte[]> re = codec.decode(value.getData());
+ public void onNext(final TransportEvent value) {
+ final RemoteEvent<byte[]> re = codec.decode(value.getData());
re.setLocalAddress(value.getLocalAddress());
re.setRemoteAddress(value.getRemoteAddress());
@@ -135,7 +136,7 @@ class OrderedPushEventHandler implements EventHandler<TransportEvent> {
LOG.log(Level.FINER, "Value length is {0}", value.getData().length);
- SocketAddress addr = re.remoteAddress();
+ final SocketAddress addr = re.remoteAddress();
OrderedEventStream stream = streamMap.get(re.remoteAddress());
if (stream == null) {
stream = new OrderedEventStream();
@@ -154,12 +155,12 @@ class OrderedPullEventHandler implements EventHandler<OrderedEventStream> {
private final EventHandler<RemoteEvent<byte[]>> handler;
- OrderedPullEventHandler(EventHandler<RemoteEvent<byte[]>> handler) {
+ OrderedPullEventHandler(final EventHandler<RemoteEvent<byte[]>> handler) {
this.handler = handler;
}
@Override
- public void onNext(OrderedEventStream stream) {
+ public void onNext(final OrderedEventStream stream) {
if (LOG.isLoggable(Level.FINER)) {
LOG.log(Level.FINER, "{0}", stream);
}
@@ -183,7 +184,7 @@ class OrderedEventStream {
nextSeq = 0;
}
- synchronized void add(RemoteEvent<byte[]> event) {
+ synchronized void add(final RemoteEvent<byte[]> event) {
queue.add(event);
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ProxyEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ProxyEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ProxyEventHandler.java
index 8f763ae..72ba6d7 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ProxyEventHandler.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ProxyEventHandler.java
@@ -48,8 +48,8 @@ public class ProxyEventHandler<T> implements EventHandler<T> {
* @param reStage the sender stage
* @throws RemoteRuntimeException
*/
- public ProxyEventHandler(RemoteIdentifier myId, RemoteIdentifier remoteId, String remoteSinkName,
- EventHandler<RemoteEvent<T>> handler, RemoteSeqNumGenerator seqGen) {
+ public ProxyEventHandler(final RemoteIdentifier myId, final RemoteIdentifier remoteId, final String remoteSinkName,
+ final EventHandler<RemoteEvent<T>> handler, final RemoteSeqNumGenerator seqGen) {
LOG.log(Level.FINE, "ProxyEventHandler myId: {0} remoteId: {1} remoteSink: {2} handler: {3}",
new Object[]{myId, remoteId, remoteSinkName, handler});
if (!(myId instanceof SocketRemoteIdentifier && remoteId instanceof SocketRemoteIdentifier)) {
@@ -69,7 +69,7 @@ public class ProxyEventHandler<T> implements EventHandler<T> {
* @param event the event
*/
@Override
- public void onNext(T event) {
+ public void onNext(final T event) {
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "remoteid: {0}\n{1}", new Object[]{remoteId.getSocketAddress(), event.toString()});
}
@@ -83,7 +83,7 @@ public class ProxyEventHandler<T> implements EventHandler<T> {
* @return a string representation of the object
*/
public String toString() {
- StringBuilder builder = new StringBuilder();
+ final StringBuilder builder = new StringBuilder();
builder.append(this.getClass().getName());
builder.append(" remote_id=");
builder.append(remoteId.toString());
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java
index 51b35cd..b7e6dc3 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java
@@ -45,7 +45,8 @@ public class RemoteEvent<T> {
* @param seq the sequence number
* @param event the event
*/
- public RemoteEvent(SocketAddress localAddr, SocketAddress remoteAddr, String src, String sink, long seq, T event) {
+ public RemoteEvent(final SocketAddress localAddr, final SocketAddress remoteAddr, final String src,
+ final String sink, final long seq, final T event) {
this.localAddr = localAddr;
this.remoteAddr = remoteAddr;
this.src = src;
@@ -86,7 +87,7 @@ public class RemoteEvent<T> {
*
* @param name the source name
*/
- public void setSource(String name) {
+ public void setSource(final String name) {
src = name;
}
@@ -104,7 +105,7 @@ public class RemoteEvent<T> {
*
* @param name the sink name
*/
- public void setSink(String name) {
+ public void setSink(final String name) {
sink = name;
}
@@ -131,7 +132,7 @@ public class RemoteEvent<T> {
*
* @param addr the local socket address
*/
- public void setLocalAddress(SocketAddress addr) {
+ public void setLocalAddress(final SocketAddress addr) {
localAddr = addr;
}
@@ -140,7 +141,7 @@ public class RemoteEvent<T> {
*
* @param addr the remote socket address
*/
- public void setRemoteAddress(SocketAddress addr) {
+ public void setRemoteAddress(final SocketAddress addr) {
remoteAddr = addr;
}
@@ -150,7 +151,7 @@ public class RemoteEvent<T> {
* @return a string representation of this object
*/
public String toString() {
- StringBuilder builder = new StringBuilder();
+ final StringBuilder builder = new StringBuilder();
builder.append("RemoteEvent");
builder.append(" localAddr=");
builder.append(localAddr);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventCodec.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventCodec.java
index b2ca25f..b3e2023 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventCodec.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventCodec.java
@@ -35,7 +35,7 @@ public class RemoteEventCodec<T> implements Codec<RemoteEvent<T>> {
*
* @param codec the codec for the event
*/
- public RemoteEventCodec(Codec<T> codec) {
+ public RemoteEventCodec(final Codec<T> codec) {
encoder = new RemoteEventEncoder<T>(codec);
decoder = new RemoteEventDecoder<T>(codec);
}
@@ -47,7 +47,7 @@ public class RemoteEventCodec<T> implements Codec<RemoteEvent<T>> {
* @returns bytes
*/
@Override
- public byte[] encode(RemoteEvent<T> obj) {
+ public byte[] encode(final RemoteEvent<T> obj) {
return encoder.encode(obj);
}
@@ -58,7 +58,7 @@ public class RemoteEventCodec<T> implements Codec<RemoteEvent<T>> {
* @return a remote event object
*/
@Override
- public RemoteEvent<T> decode(byte[] data) {
+ public RemoteEvent<T> decode(final byte[] data) {
return decoder.decode(data);
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventComparator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventComparator.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventComparator.java
index 83eb6ce..049b4d6 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventComparator.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventComparator.java
@@ -26,7 +26,7 @@ import java.util.Comparator;
public class RemoteEventComparator<T> implements Comparator<RemoteEvent<T>> {
@Override
- public int compare(RemoteEvent<T> o1, RemoteEvent<T> o2) {
+ public int compare(final RemoteEvent<T> o1, final RemoteEvent<T> o2) {
if (o1.getSeq() < o2.getSeq()) {
return -1;
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java
index 7abf64e..3ef0a27 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java
@@ -37,7 +37,7 @@ public class RemoteEventDecoder<T> implements Decoder<RemoteEvent<T>> {
*
* @param decoder the decoder of the event
*/
- public RemoteEventDecoder(Decoder<T> decoder) {
+ public RemoteEventDecoder(final Decoder<T> decoder) {
this.decoder = decoder;
}
@@ -49,13 +49,13 @@ public class RemoteEventDecoder<T> implements Decoder<RemoteEvent<T>> {
* @throws RemoteRuntimeException
*/
@Override
- public RemoteEvent<T> decode(byte[] data) {
- WakeMessagePBuf pbuf;
+ public RemoteEvent<T> decode(final byte[] data) {
+ final WakeMessagePBuf pbuf;
try {
pbuf = WakeMessagePBuf.parseFrom(data);
return new RemoteEvent<T>(null, null, pbuf.getSource(), pbuf.getSink(), pbuf.getSeq(),
decoder.decode(pbuf.getData().toByteArray()));
- } catch (InvalidProtocolBufferException e) {
+ } catch (final InvalidProtocolBufferException e) {
throw new RemoteRuntimeException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java
index d5a777b..db0c78a 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java
@@ -37,7 +37,7 @@ public class RemoteEventEncoder<T> implements Encoder<RemoteEvent<T>> {
*
* @param encoder the encoder of the event
*/
- public RemoteEventEncoder(Encoder<T> encoder) {
+ public RemoteEventEncoder(final Encoder<T> encoder) {
this.encoder = encoder;
}
@@ -49,7 +49,7 @@ public class RemoteEventEncoder<T> implements Encoder<RemoteEvent<T>> {
* @throws RemoteRuntimeException
*/
@Override
- public byte[] encode(RemoteEvent<T> obj) {
+ public byte[] encode(final RemoteEvent<T> obj) {
if (obj.getSink() == null) {
throw new RemoteRuntimeException("Sink stage is null");
}
@@ -57,8 +57,8 @@ public class RemoteEventEncoder<T> implements Encoder<RemoteEvent<T>> {
throw new RemoteRuntimeException("Event is null");
}
- WakeMessagePBuf.Builder builder = WakeMessagePBuf.newBuilder();
- String source = obj.getSource() == null ? "" : obj.getSource();
+ final WakeMessagePBuf.Builder builder = WakeMessagePBuf.newBuilder();
+ final String source = obj.getSource() == null ? "" : obj.getSource();
builder.setSource(source);
builder.setSink(obj.getSink());
builder.setSeq(obj.getSeq());
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteReceiverEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteReceiverEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteReceiverEventHandler.java
index c75b30f..bbb1217 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteReceiverEventHandler.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteReceiverEventHandler.java
@@ -38,7 +38,7 @@ class RemoteReceiverEventHandler implements EventHandler<TransportEvent> {
*
* @param handler the upstream handler
*/
- RemoteReceiverEventHandler(EventHandler<RemoteEvent<byte[]>> handler) {
+ RemoteReceiverEventHandler(final EventHandler<RemoteEvent<byte[]>> handler) {
this.codec = new RemoteEventCodec<byte[]>(new ByteCodec());
this.handler = handler;
}
@@ -49,8 +49,8 @@ class RemoteReceiverEventHandler implements EventHandler<TransportEvent> {
* @param e the event
*/
@Override
- public void onNext(TransportEvent e) {
- RemoteEvent<byte[]> re = codec.decode(e.getData());
+ public void onNext(final TransportEvent e) {
+ final RemoteEvent<byte[]> re = codec.decode(e.getData());
re.setLocalAddress(e.getLocalAddress());
re.setRemoteAddress(e.getRemoteAddress());
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteReceiverStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteReceiverStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteReceiverStage.java
index 1f19f3d..4ea47ce 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteReceiverStage.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteReceiverStage.java
@@ -69,7 +69,7 @@ public class RemoteReceiverStage implements EStage<TransportEvent> {
* @param value the event
*/
@Override
- public void onNext(TransportEvent value) {
+ public void onNext(final TransportEvent value) {
LOG.log(Level.FINEST, "{0}", value);
stage.onNext(value);
}
@@ -87,7 +87,7 @@ public class RemoteReceiverStage implements EStage<TransportEvent> {
// wait for threads to finish for timeout
if (!executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
LOG.log(Level.WARNING, "Executor did not terminate in {0} ms.", shutdownTimeout);
- List<Runnable> droppedRunnables = executor.shutdownNow();
+ final List<Runnable> droppedRunnables = executor.shutdownNow();
LOG.log(Level.WARNING, "Executor dropped {0} tasks.", droppedRunnables.size());
}
} catch (final InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java
index 65b9461..057caac 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java
@@ -53,7 +53,7 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> {
* @param transport the transport to send events
* @param executor the executor service used for creating channels
*/
- RemoteSenderEventHandler(Encoder<T> encoder, Transport transport, ExecutorService executor) {
+ RemoteSenderEventHandler(final Encoder<T> encoder, final Transport transport, final ExecutorService executor) {
this.encoder = new RemoteEventEncoder<T>(encoder);
this.transport = transport;
this.executor = executor;
@@ -61,7 +61,7 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> {
this.queue = new LinkedBlockingQueue<RemoteEvent<T>>();
}
- void setLink(Link<byte[]> link) {
+ void setLink(final Link<byte[]> link) {
LOG.log(Level.FINEST, "thread {0} link {1}", new Object[]{Thread.currentThread(), link});
linkRef.compareAndSet(null, link);
consumeQueue();
@@ -74,7 +74,7 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> {
LOG.log(Level.FINEST, "{0}", event);
linkRef.get().write(encoder.encode(event));
}
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
e.printStackTrace();
throw new RemoteRuntimeException(e);
}
@@ -87,19 +87,19 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> {
* @throws RemoteRuntimeException
*/
@Override
- public void onNext(RemoteEvent<T> value) {
+ public void onNext(final RemoteEvent<T> value) {
try {
if (linkRef.get() == null) {
queue.add(value);
- Link<byte[]> link = transport.get(value.remoteAddress());
+ final Link<byte[]> link = transport.get(value.remoteAddress());
if (link != null) {
LOG.log(Level.FINEST, "transport get link: {0}", link);
setLink(link);
return;
}
- ConnectFutureTask<Link<byte[]>> cf = new ConnectFutureTask<Link<byte[]>>(
+ final ConnectFutureTask<Link<byte[]>> cf = new ConnectFutureTask<Link<byte[]>>(
new ConnectCallable(transport, value.localAddress(), value.remoteAddress()),
new ConnectEventHandler<T>(this));
executor.submit(cf);
@@ -113,7 +113,7 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> {
}
linkRef.get().write(encoder.encode(value));
}
- } catch (RemoteRuntimeException ex2) {
+ } catch (final RemoteRuntimeException ex2) {
ex2.printStackTrace();
throw ex2;
}
@@ -128,7 +128,7 @@ class ConnectCallable implements Callable<Link<byte[]>> {
private final SocketAddress localAddress;
private final SocketAddress remoteAddress;
- ConnectCallable(Transport transport, SocketAddress localAddress, SocketAddress remoteAddress) {
+ ConnectCallable(final Transport transport, final SocketAddress localAddress, final SocketAddress remoteAddress) {
this.transport = transport;
this.localAddress = localAddress;
this.remoteAddress = remoteAddress;
@@ -146,12 +146,12 @@ class ConnectEventHandler<T> implements EventHandler<ConnectFutureTask<Link<byte
private final RemoteSenderEventHandler<T> handler;
- ConnectEventHandler(RemoteSenderEventHandler<T> handler) {
+ ConnectEventHandler(final RemoteSenderEventHandler<T> handler) {
this.handler = handler;
}
@Override
- public void onNext(ConnectFutureTask<Link<byte[]>> value) {
+ public void onNext(final ConnectFutureTask<Link<byte[]>> value) {
try {
handler.setLink(value.get());
} catch (InterruptedException | ExecutionException e) {
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSeqNumGenerator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSeqNumGenerator.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSeqNumGenerator.java
index 5c38471..d30c8ee 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSeqNumGenerator.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSeqNumGenerator.java
@@ -35,7 +35,7 @@ public class RemoteSeqNumGenerator {
seqMap = new ConcurrentHashMap<SocketAddress, AtomicLong>();
}
- public long getNextSeq(SocketAddress addr) {
+ public long getNextSeq(final SocketAddress addr) {
AtomicLong seq = seqMap.get(addr);
if (seq == null) {
seq = new AtomicLong(0);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/SocketRemoteIdentifier.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/SocketRemoteIdentifier.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/SocketRemoteIdentifier.java
index 1ae7478..0914813 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/SocketRemoteIdentifier.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/SocketRemoteIdentifier.java
@@ -35,7 +35,7 @@ public class SocketRemoteIdentifier implements RemoteIdentifier {
*
* @param addr the socket address
*/
- public SocketRemoteIdentifier(InetSocketAddress addr) {
+ public SocketRemoteIdentifier(final InetSocketAddress addr) {
this.addr = addr;
}
@@ -45,20 +45,20 @@ public class SocketRemoteIdentifier implements RemoteIdentifier {
* @param str the string representation
* @throws RemoteRuntimeException
*/
- public SocketRemoteIdentifier(String str) {
+ public SocketRemoteIdentifier(final String str) {
int index = str.indexOf("0:0:0:0:0:0:0:0:");
if (index >= 0) {
- String host = str.substring(0, 15);
- int port = Integer.parseInt(str.substring(index + 16));
+ final String host = str.substring(0, 15);
+ final int port = Integer.parseInt(str.substring(index + 16));
this.addr = new InetSocketAddress(host, port);
} else {
index = str.indexOf(":");
if (index <= 0) {
throw new RemoteRuntimeException("Invalid name " + str);
}
- String host = str.substring(0, index);
- int port = Integer.parseInt(str.substring(index + 1));
+ final String host = str.substring(0, index);
+ final int port = Integer.parseInt(str.substring(index + 1));
this.addr = new InetSocketAddress(host, port);
}
}
@@ -89,7 +89,7 @@ public class SocketRemoteIdentifier implements RemoteIdentifier {
* @return true if the object is the same as the object argument; false, otherwise
*/
@Override
- public boolean equals(Object o) {
+ public boolean equals(final Object o) {
return addr.equals(((SocketRemoteIdentifier) o).getSocketAddress());
}
@@ -100,7 +100,7 @@ public class SocketRemoteIdentifier implements RemoteIdentifier {
*/
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
+ final StringBuilder builder = new StringBuilder();
builder.append("socket://");
builder.append(addr.getHostString());
builder.append(":");
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/StringCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/StringCodec.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/StringCodec.java
index 70f8403..d972a33 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/StringCodec.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/StringCodec.java
@@ -32,7 +32,7 @@ public class StringCodec implements Codec<String> {
* @return a byte array representation of the string
*/
@Override
- public byte[] encode(String obj) {
+ public byte[] encode(final String obj) {
return obj.getBytes();
}
@@ -43,7 +43,7 @@ public class StringCodec implements Codec<String> {
* @return a string
*/
@Override
- public String decode(byte[] buf) {
+ public String decode(final byte[] buf) {
return new String(buf);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Subscription.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Subscription.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Subscription.java
index b3e06e6..5ae498a 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Subscription.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Subscription.java
@@ -34,7 +34,7 @@ public class Subscription<T> implements AutoCloseable {
* @param token the token for finding the subscription
* @param handlerContainer the container managing handlers
*/
- public Subscription(T token, HandlerContainer<T> handlerContainer) {
+ public Subscription(final T token, final HandlerContainer<T> handlerContainer) {
this.token = token;
this.container = handlerContainer;
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java
index 3736017..5f6b15e 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java
@@ -39,7 +39,7 @@ public class TransportEvent {
* @param localAddr the local socket address
* @param remoteAddr the remote socket address
*/
- public TransportEvent(byte[] data, SocketAddress localAddr, SocketAddress remoteAddr) {
+ public TransportEvent(final byte[] data, final SocketAddress localAddr, final SocketAddress remoteAddr) {
this.data = data;
this.localAddr = localAddr;
this.remoteAddr = remoteAddr;
@@ -53,7 +53,7 @@ public class TransportEvent {
* @param data
* @param link
*/
- public TransportEvent(byte[] data, Link<byte[]> link) {
+ public TransportEvent(final byte[] data, final Link<byte[]> link) {
this.data = data;
this.link = link;
if (this.link != null) {
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Tuple2.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Tuple2.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Tuple2.java
index 334f319..13f1d71 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Tuple2.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Tuple2.java
@@ -29,7 +29,7 @@ public class Tuple2<T1, T2> {
private final T1 t1;
private final T2 t2;
- public Tuple2(T1 t1, T2 t2) {
+ public Tuple2(final T1 t1, final T2 t2) {
this.t1 = t1;
this.t2 = t2;
}
@@ -48,8 +48,8 @@ public class Tuple2<T1, T2> {
}
@Override
- public boolean equals(Object o) {
- Tuple2<T1, T2> tuple = (Tuple2<T1, T2>) o;
+ public boolean equals(final Object o) {
+ final Tuple2<T1, T2> tuple = (Tuple2<T1, T2>) o;
return t1.equals((Object) tuple.getT1()) && t2.equals((Object) tuple.getT2());
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RandomRangeIterator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RandomRangeIterator.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RandomRangeIterator.java
index 2cf4a7a..635b8c1 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RandomRangeIterator.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RandomRangeIterator.java
@@ -35,7 +35,7 @@ final class RandomRangeIterator implements Iterator<Integer> {
private int currentRetryCount;
private final Random random = new Random(System.currentTimeMillis());
- RandomRangeIterator(final int tcpPortRangeBegin, final int tcpPortRangeCount, int tryCount) {
+ RandomRangeIterator(final int tcpPortRangeBegin, final int tcpPortRangeCount, final int tryCount) {
this.tcpPortRangeBegin = tcpPortRangeBegin;
this.tcpPortRangeCount = tcpPortRangeCount;
this.tryCount = tryCount;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/exception/TransportRuntimeException.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/exception/TransportRuntimeException.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/exception/TransportRuntimeException.java
index bddc167..d8c329b 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/exception/TransportRuntimeException.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/exception/TransportRuntimeException.java
@@ -30,7 +30,7 @@ public class TransportRuntimeException extends RuntimeException {
* @param s the detailed message
* @param e the cause
*/
- public TransportRuntimeException(String s, Throwable e) {
+ public TransportRuntimeException(final String s, final Throwable e) {
super(s, e);
}
@@ -39,7 +39,7 @@ public class TransportRuntimeException extends RuntimeException {
*
* @param s the detailed message
*/
- public TransportRuntimeException(String s) {
+ public TransportRuntimeException(final String s) {
super(s);
}
@@ -48,7 +48,7 @@ public class TransportRuntimeException extends RuntimeException {
*
* @param e the cause
*/
- public TransportRuntimeException(Throwable e) {
+ public TransportRuntimeException(final Throwable e) {
super(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/AbstractNettyEventListener.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/AbstractNettyEventListener.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/AbstractNettyEventListener.java
index 7d77b76..8dea218 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/AbstractNettyEventListener.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/AbstractNettyEventListener.java
@@ -54,7 +54,7 @@ abstract class AbstractNettyEventListener implements NettyEventListener {
}
@Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
final Channel channel = ctx.channel();
final byte[] message = (byte[]) msg;
@@ -70,7 +70,7 @@ abstract class AbstractNettyEventListener implements NettyEventListener {
}
@Override
- public void exceptionCaught(final ChannelHandlerContext ctx, Throwable cause) {
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
final Channel channel = ctx.channel();
LOG.log(Level.WARNING, "ExceptionEvent: local: {0} remote: {1} :: {2}", new Object[]{
channel.localAddress(), channel.remoteAddress(), cause});
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ByteEncoder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ByteEncoder.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ByteEncoder.java
index 2536ea5..0cca5e1 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ByteEncoder.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ByteEncoder.java
@@ -29,7 +29,7 @@ public class ByteEncoder implements Encoder<byte[]> {
* @see org.apache.reef.wake.remote.Encoder#encode(java.lang.Object)
*/
@Override
- public byte[] encode(byte[] obj) {
+ public byte[] encode(final byte[] obj) {
return obj;
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ChunkedReadWriteHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ChunkedReadWriteHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ChunkedReadWriteHandler.java
index df15f87..f55bbeb 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ChunkedReadWriteHandler.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ChunkedReadWriteHandler.java
@@ -64,11 +64,11 @@ public class ChunkedReadWriteHandler extends ChunkedWriteHandler {
* org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelEvent)
*/
@Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
if (msg instanceof byte[]) {
- byte[] data = (byte[]) msg;
+ final byte[] data = (byte[]) msg;
if (start) {
//LOG.log(Level.FINEST, "{0} Starting dechunking of a chunked write", curThrName);
@@ -93,7 +93,7 @@ public class ChunkedReadWriteHandler extends ChunkedWriteHandler {
// "Creating upstream msg event with the dechunked byte[{1}]", new Object[]{curThrName, expectedSize});
//if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, "Resetting state to begin another dechunking",
// curThrName);
- byte[] temp = retArr;
+ final byte[] temp = retArr;
start = true;
expectedSize = 0;
readBuffer.release();
@@ -117,7 +117,7 @@ public class ChunkedReadWriteHandler extends ChunkedWriteHandler {
* the second begins.
*/
@Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception {
if (msg instanceof ByteBuf) {
@@ -183,7 +183,7 @@ public class ChunkedReadWriteHandler extends ChunkedWriteHandler {
private class ByteBufCloseableStream extends ByteBufInputStream {
private final ByteBuf buffer;
- public ByteBufCloseableStream(ByteBuf buffer) {
+ public ByteBufCloseableStream(final ByteBuf buffer) {
super(buffer);
this.buffer = buffer;
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java
index a9634a6..4c6626c 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java
@@ -37,7 +37,7 @@ public class LoggingLinkListener<T> implements LinkListener<T> {
* Called when the sent message is transferred successfully.
*/
@Override
- public void onSuccess(T message) {
+ public void onSuccess(final T message) {
if (LOG.isLoggable(Level.FINEST)) {
LOG.log(Level.FINEST, "The message is successfully sent : {0}", new Object[]{message});
}
@@ -47,7 +47,7 @@ public class LoggingLinkListener<T> implements LinkListener<T> {
* Called when the sent message to remoteAddress is failed to be transferred.
*/
@Override
- public void onException(Throwable cause, SocketAddress remoteAddress, T message) {
+ public void onException(final Throwable cause, final SocketAddress remoteAddress, final T message) {
if (LOG.isLoggable(Level.FINEST)) {
LOG.log(Level.FINEST, "The message to {0} is failed to be sent. message : {1}, cause : {2}"
, new Object[]{remoteAddress, message, cause});
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java
index 8921486..5b8191c 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java
@@ -73,7 +73,7 @@ public class MessagingTransportFactory implements TransportFactory {
final EventHandler<TransportEvent> serverHandler,
final EventHandler<Exception> exHandler) {
- Injector injector = Tang.Factory.getTang().newInjector();
+ final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, this.localAddress);
injector.bindVolatileParameter(RemoteConfiguration.Port.class, port);
injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class, new SyncStage<>(clientHandler));
@@ -84,13 +84,13 @@ public class MessagingTransportFactory implements TransportFactory {
transport = injector.getInstance(NettyMessagingTransport.class);
transport.registerErrorHandler(exHandler);
return transport;
- } catch (InjectionException e) {
+ } catch (final InjectionException e) {
throw new RuntimeException(e);
}
}
@Override
- public Transport newInstance(final String hostAddress, int port,
+ public Transport newInstance(final String hostAddress, final int port,
final EStage<TransportEvent> clientStage,
final EStage<TransportEvent> serverStage,
final int numberOfTries,
@@ -100,14 +100,14 @@ public class MessagingTransportFactory implements TransportFactory {
}
@Override
- public Transport newInstance(final String hostAddress, int port,
+ public Transport newInstance(final String hostAddress, final int port,
final EStage<TransportEvent> clientStage,
final EStage<TransportEvent> serverStage,
final int numberOfTries,
final int retryTimeout,
final TcpPortProvider tcpPortProvider) {
- Injector injector = Tang.Factory.getTang().newInjector();
+ final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, hostAddress);
injector.bindVolatileParameter(RemoteConfiguration.Port.class, port);
injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class, clientStage);
@@ -117,7 +117,7 @@ public class MessagingTransportFactory implements TransportFactory {
injector.bindVolatileInstance(TcpPortProvider.class, tcpPortProvider);
try {
return injector.getInstance(NettyMessagingTransport.class);
- } catch (InjectionException e) {
+ } catch (final InjectionException e) {
throw new RuntimeException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyChannelInitializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyChannelInitializer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyChannelInitializer.java
index 0f00ea7..011b9c2 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyChannelInitializer.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyChannelInitializer.java
@@ -40,7 +40,7 @@ class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
}
@Override
- protected void initChannel(SocketChannel ch) throws Exception {
+ protected void initChannel(final SocketChannel ch) throws Exception {
ch.pipeline()
.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAXFRAMELENGTH, 0, 4, 0, 4))
.addLast("bytesDecoder", new ByteArrayDecoder())
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java
index cb80c7f..833ad3c 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java
@@ -77,7 +77,7 @@ public class NettyLink<T> implements Link<T> {
@Override
public void write(final T message) {
LOG.log(Level.FINEST, "write {0} {1}", new Object[]{channel, message});
- byte[] allData = encoder.encode(message);
+ final byte[] allData = encoder.encode(message);
// byte[] -> ByteBuf
if (listener != null) {
channel.writeAndFlush(Unpooled.wrappedBuffer(allData))
@@ -124,7 +124,7 @@ class NettyChannelFutureListener<T> implements ChannelFutureListener {
}
@Override
- public void operationComplete(ChannelFuture channelFuture) throws Exception {
+ public void operationComplete(final ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
listener.onSuccess(message);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
index 33d7137..828a10b 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
@@ -114,7 +114,7 @@ public class NettyMessagingTransport implements Transport {
@Deprecated
public NettyMessagingTransport(
final String hostAddress,
- int port,
+ final int port,
final EStage<TransportEvent> clientStage,
final EStage<TransportEvent> serverStage,
final int numberOfTries,
@@ -138,7 +138,7 @@ public class NettyMessagingTransport implements Transport {
@Inject
NettyMessagingTransport(
@Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress,
- @Parameter(RemoteConfiguration.Port.class) int port,
+ @Parameter(RemoteConfiguration.Port.class) final int port,
@Parameter(RemoteConfiguration.RemoteClientStage.class) final EStage<TransportEvent> clientStage,
@Parameter(RemoteConfiguration.RemoteServerStage.class) final EStage<TransportEvent> serverStage,
@Parameter(RemoteConfiguration.NumberOfTries.class) final int numberOfTries,
@@ -146,8 +146,9 @@ public class NettyMessagingTransport implements Transport {
final TcpPortProvider tcpPortProvider,
final LocalAddressProvider localAddressProvider) {
- if (port < 0) {
- throw new RemoteRuntimeException("Invalid server port: " + port);
+ int p = port;
+ if (p < 0) {
+ throw new RemoteRuntimeException("Invalid server port: " + p);
}
final String host = UNKNOWN_HOST_NAME.equals(hostAddress) ? localAddressProvider.getLocalAddress() : hostAddress;
@@ -181,25 +182,25 @@ public class NettyMessagingTransport implements Transport {
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);
- LOG.log(Level.FINE, "Binding to {0}", port);
+ LOG.log(Level.FINE, "Binding to {0}", p);
Channel acceptorFound = null;
try {
- if (port > 0) {
- acceptorFound = this.serverBootstrap.bind(new InetSocketAddress(host, port)).sync().channel();
+ if (p > 0) {
+ acceptorFound = this.serverBootstrap.bind(new InetSocketAddress(host, p)).sync().channel();
} else {
- Iterator<Integer> ports = tcpPortProvider.iterator();
+ final Iterator<Integer> ports = tcpPortProvider.iterator();
while (acceptorFound == null) {
if (!ports.hasNext()) {
break;
}
- port = ports.next();
- LOG.log(Level.FINEST, "Try port {0}", port);
+ p = ports.next();
+ LOG.log(Level.FINEST, "Try port {0}", p);
try {
- acceptorFound = this.serverBootstrap.bind(new InetSocketAddress(host, port)).sync().channel();
+ acceptorFound = this.serverBootstrap.bind(new InetSocketAddress(host, p)).sync().channel();
} catch (final Exception ex) {
if (ex instanceof BindException) {
- LOG.log(Level.FINEST, "The port {0} is already bound. Try again", port);
+ LOG.log(Level.FINEST, "The port {0} is already bound. Try again", p);
} else {
throw ex;
}
@@ -208,8 +209,8 @@ public class NettyMessagingTransport implements Transport {
}
} catch (final Exception ex) {
final RuntimeException transportException =
- new TransportRuntimeException("Cannot bind to port " + port);
- LOG.log(Level.SEVERE, "Cannot bind to port " + port, ex);
+ new TransportRuntimeException("Cannot bind to port " + p);
+ LOG.log(Level.SEVERE, "Cannot bind to port " + p, ex);
this.clientWorkerGroup.shutdownGracefully();
this.serverBossGroup.shutdownGracefully();
@@ -218,7 +219,7 @@ public class NettyMessagingTransport implements Transport {
}
this.acceptor = acceptorFound;
- this.serverPort = port;
+ this.serverPort = p;
this.localAddress = new InetSocketAddress(host, this.serverPort);
LOG.log(Level.FINE, "Starting netty transport socket address: {0}", this.localAddress);
@@ -236,7 +237,7 @@ public class NettyMessagingTransport implements Transport {
* @deprecated use the constructor that takes a TcpProvider and LocalAddressProvider instead.
*/
@Deprecated
- public NettyMessagingTransport(final String hostAddress, int port,
+ public NettyMessagingTransport(final String hostAddress, final int port,
final EStage<TransportEvent> clientStage,
final EStage<TransportEvent> serverStage,
final int numberOfTries,
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/AbstractRxStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/AbstractRxStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/AbstractRxStage.java
index a03a9a4..e076552 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/AbstractRxStage.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/AbstractRxStage.java
@@ -39,7 +39,7 @@ public abstract class AbstractRxStage<T> implements RxStage<T> {
*
* @param stageName the stage name
*/
- public AbstractRxStage(String stageName) {
+ public AbstractRxStage(final String stageName) {
this.closed = new AtomicBoolean(false);
this.name = stageName;
this.inMeter = new Meter(stageName + "_in");
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/RxSyncStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/RxSyncStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/RxSyncStage.java
index e287e27..d3ecd8a 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/RxSyncStage.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/RxSyncStage.java
@@ -53,8 +53,8 @@ public final class RxSyncStage<T> extends AbstractRxStage<T> {
* @param observer the observer
*/
@Inject
- public RxSyncStage(@Parameter(StageName.class) String name,
- @Parameter(StageObserver.class) Observer<T> observer) {
+ public RxSyncStage(@Parameter(StageName.class) final String name,
+ @Parameter(StageObserver.class) final Observer<T> observer) {
super(name);
this.observer = observer;
StageManager.instance().register(this);
@@ -66,7 +66,7 @@ public final class RxSyncStage<T> extends AbstractRxStage<T> {
* @param value the new value
*/
@Override
- public void onNext(T value) {
+ public void onNext(final T value) {
beforeOnNext();
observer.onNext(value);
afterOnNext();
@@ -79,7 +79,7 @@ public final class RxSyncStage<T> extends AbstractRxStage<T> {
* @param error the error
*/
@Override
- public void onError(Exception error) {
+ public void onError(final Exception error) {
observer.onError(error);
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/RxThreadPoolStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/RxThreadPoolStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/RxThreadPoolStage.java
index 82d1208..01142b3 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/RxThreadPoolStage.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/RxThreadPoolStage.java
@@ -156,7 +156,7 @@ public final class RxThreadPoolStage<T> extends AbstractRxStage<T> {
LOG.log(Level.SEVERE, "Executor terminated due to unrequired timeout");
observer.onError(new TimeoutException());
}
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
e.printStackTrace();
observer.onError(e);
}
@@ -177,12 +177,12 @@ public final class RxThreadPoolStage<T> extends AbstractRxStage<T> {
completionExecutor.shutdown();
if (!executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms.");
- List<Runnable> droppedRunnables = executor.shutdownNow();
+ final List<Runnable> droppedRunnables = executor.shutdownNow();
LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() + " tasks.");
}
if (!completionExecutor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms.");
- List<Runnable> droppedRunnables = completionExecutor.shutdownNow();
+ final List<Runnable> droppedRunnables = completionExecutor.shutdownNow();
LOG.log(Level.WARNING, "Completion executor dropped " + droppedRunnables.size() + " tasks.");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/SimpleSubject.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/SimpleSubject.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/SimpleSubject.java
index 62e2259..c83850c 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/SimpleSubject.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/SimpleSubject.java
@@ -38,7 +38,7 @@ public class SimpleSubject<T> implements Subject<T, T> {
* @param observer the observer
*/
@Inject
- public SimpleSubject(Observer<T> observer) {
+ public SimpleSubject(final Observer<T> observer) {
this.observer = observer;
}
@@ -48,7 +48,7 @@ public class SimpleSubject<T> implements Subject<T, T> {
* @param value the new value
*/
@Override
- public void onNext(T value) {
+ public void onNext(final T value) {
this.observer.onNext(value);
}
@@ -58,7 +58,7 @@ public class SimpleSubject<T> implements Subject<T, T> {
* @param error the error
*/
@Override
- public void onError(Exception error) {
+ public void onError(final Exception error) {
this.observer.onError(error);
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java
index 5d2d3c9..e43b681 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java
@@ -35,12 +35,12 @@ public class TimeoutSubject<T> implements Subject<T, T> {
this.timeBomb = new Thread(new Runnable() {
@Override
public void run() {
- boolean finishedCopy;
+ final boolean finishedCopy;
synchronized (outer) {
if (!finished) {
try {
outer.wait(timeout);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
return;
}
}
@@ -56,8 +56,8 @@ public class TimeoutSubject<T> implements Subject<T, T> {
}
@Override
- public void onNext(T value) {
- boolean wasFinished;
+ public void onNext(final T value) {
+ final boolean wasFinished;
synchronized (this) {
wasFinished = finished;
if (!finished) {
@@ -73,7 +73,7 @@ public class TimeoutSubject<T> implements Subject<T, T> {
}
@Override
- public void onError(Exception error) {
+ public void onError(final Exception error) {
this.timeBomb.interrupt();
destination.onError(error);
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/FileHandlePool.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/FileHandlePool.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/FileHandlePool.java
index c28c22a..a0779ae 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/FileHandlePool.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/FileHandlePool.java
@@ -21,11 +21,11 @@ package org.apache.reef.wake.storage;
import java.io.FileInputStream;
public class FileHandlePool {
- public FileInputStream get(StorageIdentifier f) {
+ public FileInputStream get(final StorageIdentifier f) {
return null;
}
- public void release(StorageIdentifier f, FileInputStream is) {
+ public void release(final StorageIdentifier f, final FileInputStream is) {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/FileIdentifier.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/FileIdentifier.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/FileIdentifier.java
index 5071196..ac980c3 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/FileIdentifier.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/FileIdentifier.java
@@ -25,7 +25,7 @@ import java.net.URISyntaxException;
public class FileIdentifier implements StorageIdentifier {
private final File f;
- public FileIdentifier(String s) throws URISyntaxException {
+ public FileIdentifier(final String s) throws URISyntaxException {
f = new File(new URI(s));
}
@@ -35,7 +35,7 @@ public class FileIdentifier implements StorageIdentifier {
}
@Override
- public boolean equals(Object o) {
+ public boolean equals(final Object o) {
if (!(o instanceof FileIdentifier)) {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/ReadRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/ReadRequest.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/ReadRequest.java
index e84e483..2221f74 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/ReadRequest.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/ReadRequest.java
@@ -27,7 +27,7 @@ public class ReadRequest implements Identifiable {
final byte[] buf;
final Identifier id;
- public ReadRequest(StorageIdentifier f, long offset, byte[] buf, Identifier id) {
+ public ReadRequest(final StorageIdentifier f, final long offset, final byte[] buf, final Identifier id) {
this.f = f;
this.offset = offset;
this.buf = buf;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/SequentialFileReader.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/SequentialFileReader.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/SequentialFileReader.java
index e52f562..a864f53 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/SequentialFileReader.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/SequentialFileReader.java
@@ -29,22 +29,22 @@ public class SequentialFileReader implements EStage<ReadRequest> {
final FileHandlePool fdPool = new FileHandlePool();
@Override
- public void onNext(ReadRequest value) {
- FileInputStream fin = fdPool.get(value.f);
+ public void onNext(final ReadRequest value) {
+ final FileInputStream fin = fdPool.get(value.f);
int readSoFar = 0;
try {
synchronized (fin) {
fin.reset();
fin.skip(value.offset);
while (readSoFar != value.buf.length) {
- int ret = fin.read(value.buf, readSoFar, value.buf.length);
+ final int ret = fin.read(value.buf, readSoFar, value.buf.length);
if (ret == -1) {
break;
}
readSoFar += ret;
}
}
- } catch (IOException e) {
+ } catch (final IOException e) {
fdPool.release(value.f, fin);
// err.onNext(null); //new ReadError(e));
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java
index a174461..44e90b5 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java
@@ -39,7 +39,7 @@ public abstract class Time implements Comparable<Time> {
}
@Override
- public final int compareTo(Time o) {
+ public final int compareTo(final Time o) {
if (this.timestamp < o.timestamp) {
return -1;
}
@@ -56,7 +56,7 @@ public abstract class Time implements Comparable<Time> {
}
@Override
- public final boolean equals(Object o) {
+ public final boolean equals(final Object o) {
if (o instanceof Time) {
return compareTo((Time) o) == 0;
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java
index cc6bee9..6ffbce8 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java
@@ -34,13 +34,13 @@ public final class LogicalTimer implements Timer {
}
@Override
- public long getDuration(long time) {
+ public long getDuration(final long time) {
isReady(time);
return 0;
}
@Override
- public boolean isReady(long time) {
+ public boolean isReady(final long time) {
if (this.current < time) {
this.current = time;
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
index 6b162bc..65c75ac 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
@@ -136,7 +136,7 @@ public final class RuntimeClock implements Clock {
*/
private long findAcceptableStopTime() {
long time = timer.getCurrent();
- for (Time t : this.schedule) {
+ for (final Time t : this.schedule) {
if (t instanceof ClientAlarm) {
assert (time <= t.getTimeStamp());
time = t.getTimeStamp();
@@ -149,7 +149,7 @@ public final class RuntimeClock implements Clock {
@Override
public boolean isIdle() {
synchronized (this.schedule) {
- for (Time t : this.schedule) {
+ for (final Time t : this.schedule) {
if (t instanceof ClientAlarm) {
return false;
}
@@ -237,12 +237,12 @@ public final class RuntimeClock implements Clock {
break; // we're done.
}
}
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
// waiting interrupted - return to loop
}
}
this.handlers.onNext(new RuntimeStop(this.timer.getCurrent()));
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), e));
} finally {