You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/12/02 05:58:26 UTC
[2/3] incubator-apex-core git commit: APEX-273 - Fix existing
checkstyle violations in bufferserver module
APEX-273 - Fix existing checkstyle violations in bufferserver module
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/799df6c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/799df6c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/799df6c0
Branch: refs/heads/devel-3
Commit: 799df6c0aba94c20bef0eb2d975d3f5649b02f54
Parents: 892355c
Author: MalharJenkins <je...@datatorrent.com>
Authored: Thu Nov 19 15:44:13 2015 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Fri Nov 20 18:31:15 2015 -0800
----------------------------------------------------------------------
bufferserver/pom.xml | 3 +-
.../bufferserver/auth/AuthManager.java | 2 +-
.../bufferserver/client/AuthClient.java | 6 +-
.../bufferserver/client/Controller.java | 1 -
.../bufferserver/client/Subscriber.java | 15 +---
.../bufferserver/internal/DataList.java | 90 ++++++++++++--------
.../bufferserver/internal/DataListener.java | 10 +--
.../bufferserver/internal/FastDataList.java | 12 +--
.../bufferserver/internal/LogicalNode.java | 29 ++++---
.../bufferserver/internal/PhysicalNode.java | 3 +-
.../bufferserver/packet/DataTuple.java | 8 +-
.../bufferserver/packet/EmptyTuple.java | 10 +--
.../packet/GenericRequestTuple.java | 20 ++---
.../bufferserver/packet/MessageType.java | 17 +++-
.../bufferserver/packet/PayloadTuple.java | 6 +-
.../packet/PublishRequestTuple.java | 5 +-
.../bufferserver/packet/RequestTuple.java | 6 +-
.../bufferserver/packet/ResetRequestTuple.java | 5 +-
.../bufferserver/packet/ResetWindowTuple.java | 6 +-
.../packet/SubscribeRequestTuple.java | 59 +++++--------
.../datatorrent/bufferserver/packet/Tuple.java | 15 ++--
.../bufferserver/packet/WindowIdTuple.java | 8 +-
.../bufferserver/policy/AbstractPolicy.java | 12 +--
.../bufferserver/policy/GiveAll.java | 2 +-
.../bufferserver/policy/LeastBusy.java | 14 +--
.../datatorrent/bufferserver/policy/Policy.java | 10 +--
.../bufferserver/policy/RandomOne.java | 12 +--
.../bufferserver/policy/RoundRobin.java | 18 ++--
.../datatorrent/bufferserver/server/Server.java | 44 ++++++----
.../bufferserver/storage/DiskStorage.java | 73 ++++++----------
.../bufferserver/util/SerializedData.java | 4 +-
.../datatorrent/bufferserver/util/System.java | 12 ++-
.../datatorrent/bufferserver/util/VarInt.java | 9 +-
.../bufferserver/client/SubscriberTest.java | 39 ++++-----
.../bufferserver/packet/NoMessageTupleTest.java | 23 +----
.../packet/PublishRequestTupleTest.java | 10 +--
.../packet/ResetWindowTupleTest.java | 10 +--
.../packet/SubscribeRequestTupleTest.java | 20 +++--
.../bufferserver/server/ServerTest.java | 67 ++++++++-------
.../bufferserver/storage/DiskStorageTest.java | 11 +--
.../bufferserver/support/Subscriber.java | 9 +-
.../bufferserver/util/CodecTest.java | 4 +-
42 files changed, 346 insertions(+), 393 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/pom.xml
----------------------------------------------------------------------
diff --git a/bufferserver/pom.xml b/bufferserver/pom.xml
index f6fc8b3..efc5b5e 100644
--- a/bufferserver/pom.xml
+++ b/bufferserver/pom.xml
@@ -48,10 +48,9 @@
</plugin>
-->
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
- <maxAllowedViolations>228</maxAllowedViolations>
+ <consoleOutput>true</consoleOutput>
</configuration>
</plugin>
</plugins>
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java
index 453befa..942a896 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java
@@ -27,7 +27,7 @@ import java.security.SecureRandom;
*/
public class AuthManager
{
- private final static int BUFFER_SERVER_TOKEN_LENGTH = 20;
+ private static final int BUFFER_SERVER_TOKEN_LENGTH = 20;
private static SecureRandom generator = new SecureRandom();
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java
index fc105b2..4465143 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java
@@ -45,7 +45,8 @@ public abstract class AuthClient extends AbstractLengthPrependerClient
super(readbuffer, position, sendBufferSize);
}
- protected void sendAuthenticate() {
+ protected void sendAuthenticate()
+ {
if (token != null) {
write(token);
}
@@ -65,7 +66,8 @@ public abstract class AuthClient extends AbstractLengthPrependerClient
}
}
if (!authenticated) {
- throw new AccessControlException("Buffer server security is enabled. Access is restricted without proper credentials.");
+ throw new AccessControlException("Buffer server security is enabled." +
+ " Access is restricted without proper credentials.");
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java
index d2faf69..0c3bac9 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java
@@ -26,7 +26,6 @@ import com.datatorrent.bufferserver.packet.PurgeRequestTuple;
import com.datatorrent.bufferserver.packet.ResetRequestTuple;
import com.datatorrent.bufferserver.packet.Tuple;
import com.datatorrent.bufferserver.util.Codec;
-import com.datatorrent.netlet.AbstractLengthPrependerClient;
import com.datatorrent.netlet.util.Slice;
/**
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java
index df91f0b..2b18d04 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java
@@ -23,7 +23,7 @@ import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datatorrent.bufferserver.packet.SubscribeRequestTuple;
+import static com.datatorrent.bufferserver.packet.SubscribeRequestTuple.getSerializedRequest;
/**
*
@@ -46,18 +46,11 @@ public abstract class Subscriber extends AuthClient
this.id = id;
}
- public void activate(String version, String type, String sourceId, int mask, Collection<Integer> partitions, long windowId, int bufferSize)
+ public void activate(final String version, final String type, final String sourceId, final int mask,
+ final Collection<Integer> partitions, final long windowId, final int bufferSize)
{
sendAuthenticate();
- write(SubscribeRequestTuple.getSerializedRequest(
- version,
- id,
- type,
- sourceId,
- mask,
- partitions,
- windowId,
- bufferSize));
+ write(getSerializedRequest(version, id, type, sourceId, mask, partitions, windowId, bufferSize));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index 1f6c273..fa20aa2 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -90,8 +90,8 @@ public class DataList
public DataList(String identifier)
{
/*
- * We use 64MB (the default HDFS block getSize) as the getSize of the memory pool so we can flush the data 1 block at a time to the filesystem.
- * we will use default value of 8 block sizes to be cached in memory
+ * We use 64MB (the default HDFS block getSize) as the getSize of the memory pool so we can flush the data 1 block
+ * at a time to the filesystem. We will use default value of 8 block sizes to be cached in memory
*/
this(identifier, 64 * 1024 * 1024, 8);
}
@@ -104,7 +104,8 @@ public class DataList
public void rewind(final int baseSeconds, final int windowId) throws IOException
{
final long longWindowId = (long)baseSeconds << 32 | windowId;
- logger.debug("Rewinding {} from window ID {} to window ID {}", this, Codec.getStringWindowId(last.ending_window), Codec.getStringWindowId(longWindowId));
+ logger.debug("Rewinding {} from window ID {} to window ID {}", this, Codec.getStringWindowId(last.ending_window),
+ Codec.getStringWindowId(longWindowId));
int numberOfInMemBlockRewound = 0;
synchronized (this) {
@@ -139,13 +140,16 @@ public class DataList
}
/*
- TODO: properly rewind Data List iterators, especially handle case when iterators point to blocks past the last block.
- */
+ * TODO: properly rewind Data List iterators, especially handle case when iterators point to blocks past the last
+ * block.
+ */
final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.addAndGet(numberOfInMemBlockRewound);
- assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
+ assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " +
+ numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
resumeSuspendedClients(numberOfInMemBlockPermits);
- logger.debug("Discarded {} in memory blocks during rewind. Number of in memory blocks permits {} after rewinding {}. ", numberOfInMemBlockRewound, numberOfInMemBlockPermits, this);
+ logger.debug("Discarded {} in memory blocks during rewind. Number of in memory blocks permits {} after" +
+ " rewinding {}.", numberOfInMemBlockRewound, numberOfInMemBlockPermits, this);
}
@@ -177,11 +181,13 @@ public class DataList
public void purge(final int baseSeconds, final int windowId)
{
final long longWindowId = (long)baseSeconds << 32 | windowId;
- logger.debug("Purging {} from window ID {} to window ID {}", this, Codec.getStringWindowId(first.starting_window), Codec.getStringWindowId(longWindowId));
+ logger.debug("Purging {} from window ID {} to window ID {}", this, Codec.getStringWindowId(first.starting_window),
+ Codec.getStringWindowId(longWindowId));
int numberOfInMemBlockPurged = 0;
synchronized (this) {
- for (Block prev = null, temp = first; temp != null && temp.starting_window <= longWindowId; prev = temp, temp = temp.next) {
+ for (Block prev = null, temp = first; temp != null && temp.starting_window <= longWindowId;
+ prev = temp, temp = temp.next) {
if (temp.ending_window > longWindowId || temp == last) {
if (prev != null) {
first = temp;
@@ -204,9 +210,11 @@ public class DataList
}
final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.addAndGet(numberOfInMemBlockPurged);
- assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
+ assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " +
+ numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
resumeSuspendedClients(numberOfInMemBlockPermits);
- logger.debug("Discarded {} in memory blocks during purge. Number of in memory blocks permits {} after purging {}. ", numberOfInMemBlockPurged, numberOfInMemBlockPermits, this);
+ logger.debug("Discarded {} in memory blocks during purge. Number of in memory blocks permits {} after purging {}. ",
+ numberOfInMemBlockPurged, numberOfInMemBlockPermits, this);
}
@@ -220,7 +228,8 @@ public class DataList
public void flush(final int writeOffset)
{
- //logger.debug("size = {}, processingOffset = {}, nextOffset = {}, writeOffset = {}", size, processingOffset, nextOffset.integer, writeOffset);
+ //logger.debug("size = {}, processingOffset = {}, nextOffset = {}, writeOffset = {}", size, processingOffset,
+ // nextOffset.integer, writeOffset);
flush:
do {
while (size == 0) {
@@ -427,7 +436,8 @@ public class DataList
suspendedClients.clear();
}
} else {
- logger.debug("Keeping clients: {} suspended, numberOfInMemBlockPermits={}, Listeners: {}", suspendedClients, numberOfInMemBlockPermits, all_listeners);
+ logger.debug("Keeping clients: {} suspended, numberOfInMemBlockPermits={}, Listeners: {}", suspendedClients,
+ numberOfInMemBlockPermits, all_listeners);
}
return resumedSuspendedClients;
}
@@ -608,10 +618,11 @@ public class DataList
try (DataListIterator dli = getIterator(this)) {
done:
while (dli.hasNext()) {
- SerializedData sd = dli.next();
+ final SerializedData sd = dli.next();
+ final int length = sd.length - sd.dataOffset + sd.offset;
switch (sd.buffer[sd.dataOffset]) {
case MessageType.RESET_WINDOW_VALUE:
- ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
+ final ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, length);
bs = (long)rwt.getBaseSeconds() << 32;
if (bs > windowId) {
writingOffset = sd.offset;
@@ -620,7 +631,7 @@ public class DataList
break;
case MessageType.BEGIN_WINDOW_VALUE:
- BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
+ BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, length);
if ((bs | bwt.getWindowId()) >= windowId) {
writingOffset = sd.offset;
break done;
@@ -649,8 +660,9 @@ public class DataList
public void purge(long longWindowId)
{
-// logger.debug("starting_window = {}, longWindowId = {}, ending_window = {}",
-// new Object[] {VarInt.getStringWindowId(starting_window), VarInt.getStringWindowId(longWindowId), VarInt.getStringWindowId(ending_window)});
+ //logger.debug("starting_window = {}, longWindowId = {}, ending_window = {}",
+ // VarInt.getStringWindowId(starting_window), VarInt.getStringWindowId(longWindowId),
+ // VarInt.getStringWindowId(ending_window));
boolean found = false;
long bs = starting_window & 0xffffffff00000000L;
SerializedData lastReset = null;
@@ -659,20 +671,22 @@ public class DataList
done:
while (dli.hasNext()) {
SerializedData sd = dli.next();
+ final int length = sd.length - sd.dataOffset + sd.offset;
switch (sd.buffer[sd.dataOffset]) {
case MessageType.RESET_WINDOW_VALUE:
- ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
+ ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, length);
bs = (long)rwt.getBaseSeconds() << 32;
lastReset = sd;
break;
case MessageType.BEGIN_WINDOW_VALUE:
- BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
+ BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, length);
if ((bs | bwt.getWindowId()) > longWindowId) {
found = true;
if (lastReset != null) {
/*
- * Restore the last Reset tuple if there was any and adjust the writingOffset to the beginning of the reset tuple.
+ * Restore the last Reset tuple if there was any and adjust the writingOffset to the beginning of
+ * the reset tuple.
*/
if (sd.offset >= lastReset.length) {
sd.offset -= lastReset.length;
@@ -702,7 +716,8 @@ public class DataList
* It helps with better utilization of the RAM.
*/
if (!found) {
- //logger.debug("we could not find a tuple which is in a window later than the window to be purged, so this has to be the last window published so far");
+ //logger.debug("we could not find a tuple which is in a window later than the window to be purged, " +
+ // "so this has to be the last window published so far");
if (lastReset != null && lastReset.offset != 0) {
this.readingOffset = this.writingOffset - lastReset.length;
System.arraycopy(lastReset.buffer, lastReset.offset, this.data, this.readingOffset, lastReset.length);
@@ -797,7 +812,8 @@ public class DataList
}
}
- private Runnable getStorer(final byte[] data, final int readingOffset, final int writingOffset, final Storage storage)
+ private Runnable getStorer(final byte[] data, final int readingOffset, final int writingOffset,
+ final Storage storage)
{
return new Runnable()
{
@@ -819,7 +835,8 @@ public class DataList
logger.debug("Keeping Block {} unchanged", Block.this);
}
}
- assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
+ assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " +
+ numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
resumeSuspendedClients(numberOfInMemBlockPermits);
}
}
@@ -884,11 +901,14 @@ public class DataList
@Override
public String toString()
{
- return getClass().getName() + '@' + Integer.toHexString(hashCode()) + "{identifier=" + identifier + ", data=" + (data == null ? "null" : data.length)
- + ", readingOffset=" + readingOffset + ", writingOffset=" + writingOffset
- + ", starting_window=" + Codec.getStringWindowId(starting_window) + ", ending_window=" + Codec.getStringWindowId(ending_window)
- + ", refCount=" + refCount.get() + ", uniqueIdentifier=" + uniqueIdentifier + ", next=" + (next == null ? "null" : next.identifier)
- + ", future=" + (future == null ? "null" : future.isDone() ? "Done" : future.isCancelled() ? "Cancelled" : future) + '}';
+ final String future = this.future == null ? "null" : this.future.isDone() ? "Done" :
+ this.future.isCancelled() ? "Cancelled" : this.future.toString();
+ return getClass().getName() + '@' + Integer.toHexString(hashCode()) + "{identifier=" + identifier +
+ ", data=" + (data == null ? "null" : data.length) + ", readingOffset=" + readingOffset +
+ ", writingOffset=" + writingOffset + ", starting_window=" + Codec.getStringWindowId(starting_window) +
+ ", ending_window=" + Codec.getStringWindowId(ending_window) + ", refCount=" + refCount.get() +
+ ", uniqueIdentifier=" + uniqueIdentifier + ", next=" + (next == null ? "null" : next.identifier) +
+ ", future=" + future + '}';
}
}
@@ -968,8 +988,10 @@ public class DataList
if (nextOffset.integer + size <= da.writingOffset) {
current = new SerializedData(buffer, readOffset, size + nextOffset.integer - readOffset);
current.dataOffset = nextOffset.integer;
- //if (buffer[current.dataOffset] == MessageType.BEGIN_WINDOW_VALUE || buffer[current.dataOffset] == MessageType.END_WINDOW_VALUE) {
- // Tuple t = Tuple.getTuple(current.buffer, current.dataOffset, current.length - current.dataOffset + current.offset);
+ //final byte messageType = buffer[current.dataOffset];
+ //if (messageType == MessageType.BEGIN_WINDOW_VALUE || messageType == MessageType.END_WINDOW_VALUE) {
+ // final int length = current.length - current.dataOffset + current.offset;
+ // Tuple t = Tuple.getTuple(current.buffer, current.dataOffset, length);
// logger.debug("next t = {}", t);
//}
return true;
@@ -993,9 +1015,9 @@ public class DataList
}
/**
- * Removes from the underlying collection the last element returned by the iterator (optional operation). This method can be called only once per call to
- * next. The behavior of an iterator is unspecified if the underlying collection is modified while the iteration is in progress in any way other than by
- * calling this method.
+ * Removes from the underlying collection the last element returned by the iterator (optional operation). This
+ * method can be called only once per call to next. The behavior of an iterator is unspecified if the underlying
+ * collection is modified while the iteration is in progress in any way other than by calling this method.
*/
@Override
public void remove()
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java
index 4add008..a6a1fab 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java
@@ -18,10 +18,10 @@
*/
package com.datatorrent.bufferserver.internal;
-import com.datatorrent.bufferserver.util.BitVector;
-
import java.util.Collection;
+import com.datatorrent.bufferserver.util.BitVector;
+
/**
*
* Waits for data to be added to the buffer server and then acts on it<p>
@@ -32,17 +32,17 @@ import java.util.Collection;
*/
public interface DataListener
{
- public static final BitVector NULL_PARTITION = new BitVector(0, 0);
+ BitVector NULL_PARTITION = new BitVector(0, 0);
/**
*/
- public boolean addedData();
+ boolean addedData();
/**
*
* @param partitions
* @return int
*/
- public int getPartitions(Collection<BitVector> partitions);
+ int getPartitions(Collection<BitVector> partitions);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
index 6ba7b64..af47f23 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
@@ -54,8 +54,7 @@ public class FastDataList extends DataList
size = last.data[processingOffset];
size |= (last.data[processingOffset + 1] << 8);
// logger.debug("read item = {} of size = {} at offset = {}", item++, size, processingOffset);
- }
- else {
+ } else {
if (writeOffset == last.data.length) {
processingOffset = 0;
size = 0;
@@ -73,8 +72,7 @@ public class FastDataList extends DataList
if (last.starting_window == -1) {
last.starting_window = baseSeconds | btw.getWindowId();
last.ending_window = last.starting_window;
- }
- else {
+ } else {
last.ending_window = baseSeconds | btw.getWindowId();
}
break;
@@ -83,11 +81,13 @@ public class FastDataList extends DataList
Tuple rwt = Tuple.getTuple(last.data, processingOffset, size);
baseSeconds = (long)rwt.getBaseSeconds() << 32;
break;
+
+ default:
+ break;
}
processingOffset += size;
size = 0;
- }
- else {
+ } else {
if (writeOffset == last.data.length) {
processingOffset = 0;
size = 0;
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index f867d69..9856829 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -157,7 +157,8 @@ public class LogicalNode implements DataListener
public void catchUp()
{
long lBaseSeconds = (long)iterator.getBaseSeconds() << 32;
- logger.debug("BaseSeconds = {} and lBaseSeconds = {}", Codec.getStringWindowId(baseSeconds), Codec.getStringWindowId(lBaseSeconds));
+ logger.debug("BaseSeconds = {} and lBaseSeconds = {}", Codec.getStringWindowId(baseSeconds),
+ Codec.getStringWindowId(lBaseSeconds));
if (lBaseSeconds > baseSeconds) {
baseSeconds = lBaseSeconds;
}
@@ -193,13 +194,8 @@ public class LogicalNode implements DataListener
case MessageType.BEGIN_WINDOW_VALUE:
tuple = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset);
- logger.debug("{}->{} condition {} =? {}",
- new Object[] {
- upstream,
- group,
- Codec.getStringWindowId(baseSeconds | tuple.getWindowId()),
- Codec.getStringWindowId(skipWindowId)
- });
+ logger.debug("{}->{} condition {} =? {}", upstream, group,
+ Codec.getStringWindowId(baseSeconds | tuple.getWindowId()), Codec.getStringWindowId(skipWindowId));
if ((baseSeconds | tuple.getWindowId()) > skipWindowId) {
logger.debug("caught up {}->{} skipping {} payload tuples", upstream, group, skippedPayloadTuples);
ready = GiveAll.getInstance().distribute(physicalNodes, data);
@@ -212,10 +208,12 @@ public class LogicalNode implements DataListener
case MessageType.CODEC_STATE_VALUE:
case MessageType.END_STREAM_VALUE:
ready = GiveAll.getInstance().distribute(physicalNodes, data);
- logger.debug("Message {} was distributed to {}", MessageType.valueOf(data.buffer[data.dataOffset]), physicalNodes);
+ logger.debug("Message {} was distributed to {}", MessageType.valueOf(data.buffer[data.dataOffset]),
+ physicalNodes);
break;
default:
- logger.debug("Message {} was not distributed to {}", MessageType.valueOf(data.buffer[data.dataOffset]), physicalNodes);
+ logger.debug("Message {} was not distributed to {}", MessageType.valueOf(data.buffer[data.dataOffset]),
+ physicalNodes);
}
}
} catch (InterruptedException ie) {
@@ -252,7 +250,8 @@ public class LogicalNode implements DataListener
break;
case MessageType.RESET_WINDOW_VALUE:
- Tuple resetWindow = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset);
+ final int length = data.length - data.dataOffset + data.offset;
+ Tuple resetWindow = Tuple.getTuple(data.buffer, data.dataOffset, length);
baseSeconds = (long)resetWindow.getBaseSeconds() << 32;
ready = GiveAll.getInstance().distribute(physicalNodes, data);
break;
@@ -266,9 +265,10 @@ public class LogicalNode implements DataListener
} else {
while (ready && iterator.hasNext()) {
SerializedData data = iterator.next();
+ final int length = data.length - data.dataOffset + data.offset;
switch (data.buffer[data.dataOffset]) {
case MessageType.PAYLOAD_VALUE:
- Tuple tuple = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset);
+ Tuple tuple = Tuple.getTuple(data.buffer, data.dataOffset, length);
int value = tuple.getPartition();
for (BitVector bv : partitions) {
if (bv.matches(value)) {
@@ -283,7 +283,7 @@ public class LogicalNode implements DataListener
break;
case MessageType.RESET_WINDOW_VALUE:
- tuple = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset);
+ tuple = Tuple.getTuple(data.buffer, data.dataOffset, length);
baseSeconds = (long)tuple.getBaseSeconds() << 32;
ready = GiveAll.getInstance().distribute(physicalNodes, data);
break;
@@ -344,7 +344,8 @@ public class LogicalNode implements DataListener
@Override
public String toString()
{
- return "LogicalNode{" + "upstream=" + upstream + ", group=" + group + ", partitions=" + partitions + ", iterator=" + iterator + '}';
+ return "LogicalNode{" + "upstream=" + upstream + ", group=" + group + ", partitions=" + partitions +
+ ", iterator=" + iterator + '}';
}
private static final Logger logger = LoggerFactory.getLogger(LogicalNode.class);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
index 880d444..424a51a 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
@@ -79,8 +79,7 @@ public class PhysicalNode
if (client.write(d.buffer, d.offset, d.length)) {
return true;
}
- }
- else {
+ } else {
if (client.send(d.buffer, d.offset, d.length)) {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java
index 3e7f23f..cb1ad5f 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java
@@ -35,13 +35,13 @@ public class DataTuple extends Tuple
@Override
public int getWindowId()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int getPartition()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
@Override
@@ -53,13 +53,13 @@ public class DataTuple extends Tuple
@Override
public int getBaseSeconds()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int getWindowWidth()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
public static byte[] getSerializedTuple(byte type, Slice f)
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java
index f034f04..3c3f184 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java
@@ -41,31 +41,31 @@ public class EmptyTuple extends Tuple
@Override
public int getWindowId()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int getPartition()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public Slice getData()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int getBaseSeconds()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int getWindowWidth()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
public static byte[] getSerializedTuple(byte value)
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java
index a815334..ea49077 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java
@@ -25,8 +25,6 @@ import org.slf4j.LoggerFactory;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.netlet.util.VarInt;
-import static com.datatorrent.bufferserver.packet.Tuple.CLASSIC_VERSION;
-import static com.datatorrent.bufferserver.packet.Tuple.writeString;
/**
* <p>GenericRequestTuple class.</p>
@@ -70,12 +68,10 @@ public class GenericRequestTuple extends RequestTuple
}
version = new String(buffer, dataOffset, idlen);
dataOffset += idlen;
- }
- else if (idlen == 0) {
+ } else if (idlen == 0) {
version = EMPTY_STRING;
dataOffset++;
- }
- else {
+ } else {
return;
}
/*
@@ -87,12 +83,10 @@ public class GenericRequestTuple extends RequestTuple
}
identifier = new String(buffer, dataOffset, idlen);
dataOffset += idlen;
- }
- else if (idlen == 0) {
+ } else if (idlen == 0) {
identifier = EMPTY_STRING;
dataOffset++;
- }
- else {
+ } else {
return;
}
@@ -105,8 +99,7 @@ public class GenericRequestTuple extends RequestTuple
}
valid = true;
- }
- catch (NumberFormatException nfe) {
+ } catch (NumberFormatException nfe) {
logger.warn("Unparseable Tuple", nfe);
}
}
@@ -166,7 +159,8 @@ public class GenericRequestTuple extends RequestTuple
@Override
public String toString()
{
- return getClass().getSimpleName() + "{" + "version=" + version + ", identifier=" + identifier + ", windowId=" + Codec.getStringWindowId((long)baseSeconds | windowId) + '}';
+ return getClass().getSimpleName() + "{" + "version=" + version + ", identifier=" + identifier + ", windowId=" +
+ Codec.getStringWindowId((long)baseSeconds | windowId) + '}';
}
private static final Logger logger = LoggerFactory.getLogger(GenericRequestTuple.class);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java
index 02102da..3c0ec2c 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java
@@ -25,7 +25,20 @@ package com.datatorrent.bufferserver.packet;
*/
public enum MessageType
{
- NO_MESSAGE(0), PAYLOAD(1), RESET_WINDOW(2), BEGIN_WINDOW(3), END_WINDOW(4), END_STREAM(5), PUBLISHER_REQUEST(6), SUBSCRIBER_REQUEST(7), PURGE_REQUEST(8), RESET_REQUEST(9), CHECKPOINT(10), CODEC_STATE(11), NO_MESSAGE_ODD(127);
+ NO_MESSAGE(0),
+ PAYLOAD(1),
+ RESET_WINDOW(2),
+ BEGIN_WINDOW(3),
+ END_WINDOW(4),
+ END_STREAM(5),
+ PUBLISHER_REQUEST(6),
+ SUBSCRIBER_REQUEST(7),
+ PURGE_REQUEST(8),
+ RESET_REQUEST(9),
+ CHECKPOINT(10),
+ CODEC_STATE(11),
+ NO_MESSAGE_ODD(127);
+
public static final byte NO_MESSAGE_VALUE = 0;
public static final byte PAYLOAD_VALUE = 1;
public static final byte RESET_WINDOW_VALUE = 2;
@@ -81,7 +94,7 @@ public enum MessageType
private final int value;
- private MessageType(int value)
+ MessageType(int value)
{
this.value = value;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java
index e757097..256fc05 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java
@@ -51,7 +51,7 @@ public class PayloadTuple extends Tuple
@Override
public int getWindowId()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
@Override
@@ -69,13 +69,13 @@ public class PayloadTuple extends Tuple
@Override
public int getBaseSeconds()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int getWindowWidth()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
public static byte[] getSerializedTuple(int partition, int size)
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java
index bead7f3..6a9ba39 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java
@@ -30,9 +30,10 @@ public class PublishRequestTuple extends GenericRequestTuple
super(array, offset, len);
}
- public static byte[] getSerializedRequest(String version, String identifier, long startingWindowId)
+ public static byte[] getSerializedRequest(final String version, final String identifier, final long startingWindowId)
{
- return GenericRequestTuple.getSerializedRequest(version, identifier, startingWindowId, MessageType.PUBLISHER_REQUEST_VALUE);
+ return GenericRequestTuple.getSerializedRequest(version, identifier, startingWindowId,
+ MessageType.PUBLISHER_REQUEST_VALUE);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java
index 9fe7859..53505b4 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java
@@ -43,19 +43,19 @@ public abstract class RequestTuple extends Tuple
@Override
public int getPartition()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public Slice getData()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int getWindowWidth()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
public abstract void parse();
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java
index 66fbfcd..17ca585 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java
@@ -30,9 +30,10 @@ public class ResetRequestTuple extends GenericRequestTuple
super(array, offset, length);
}
- public static byte[] getSerializedRequest(String version, String identifier, long startingWindowId)
+ public static byte[] getSerializedRequest(final String version, final String identifier, final long startingWindowId)
{
- return GenericRequestTuple.getSerializedRequest(version, identifier, startingWindowId, MessageType.RESET_REQUEST_VALUE);
+ return GenericRequestTuple.getSerializedRequest(version, identifier, startingWindowId,
+ MessageType.RESET_REQUEST_VALUE);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java
index abeceb3..6045416 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java
@@ -43,19 +43,19 @@ public class ResetWindowTuple extends Tuple
@Override
public int getWindowId()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int getPartition()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public Slice getData()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java
index 416cee9..b63487b 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java
@@ -61,12 +61,10 @@ public class SubscribeRequestTuple extends RequestTuple
}
version = new String(buffer, dataOffset, idlen);
dataOffset += idlen;
- }
- else if (idlen == 0) {
+ } else if (idlen == 0) {
version = EMPTY_STRING;
dataOffset++;
- }
- else {
+ } else {
return;
}
/*
@@ -78,12 +76,10 @@ public class SubscribeRequestTuple extends RequestTuple
}
identifier = new String(buffer, dataOffset, idlen);
dataOffset += idlen;
- }
- else if (idlen == 0) {
+ } else if (idlen == 0) {
identifier = EMPTY_STRING;
dataOffset++;
- }
- else {
+ } else {
return;
}
@@ -103,12 +99,10 @@ public class SubscribeRequestTuple extends RequestTuple
}
streamType = new String(buffer, dataOffset, idlen);
dataOffset += idlen;
- }
- else if (idlen == 0) {
+ } else if (idlen == 0) {
streamType = EMPTY_STRING;
dataOffset++;
- }
- else {
+ } else {
return;
}
/*
@@ -120,12 +114,10 @@ public class SubscribeRequestTuple extends RequestTuple
}
upstreamIdentifier = new String(buffer, dataOffset, idlen);
dataOffset += idlen;
- }
- else if (idlen == 0) {
+ } else if (idlen == 0) {
upstreamIdentifier = EMPTY_STRING;
dataOffset++;
- }
- else {
+ } else {
return;
}
/*
@@ -139,8 +131,7 @@ public class SubscribeRequestTuple extends RequestTuple
if (mask > 0) {
while (buffer[dataOffset++] < 0) {
}
- }
- else {
+ } else {
/* mask cannot be zero */
return;
}
@@ -149,8 +140,7 @@ public class SubscribeRequestTuple extends RequestTuple
partitions[i] = readVarInt(dataOffset, limit);
if (partitions[i] == -1) {
return;
- }
- else {
+ } else {
while (buffer[dataOffset++] < 0) {
}
}
@@ -165,8 +155,7 @@ public class SubscribeRequestTuple extends RequestTuple
}
valid = true;
- }
- catch (NumberFormatException nfe) {
+ } catch (NumberFormatException nfe) {
logger.warn("Unparseable Tuple", nfe);
}
}
@@ -246,15 +235,9 @@ public class SubscribeRequestTuple extends RequestTuple
return bufferSize;
}
- public static byte[] getSerializedRequest(
- String version,
- String id,
- String down_type,
- String upstream_id,
- int mask,
- Collection<Integer> partitions,
- long startingWindowId,
- int bufferSize)
+ public static byte[] getSerializedRequest(final String version, final String id, final String down_type,
+ final String upstream_id, final int mask, final Collection<Integer> partitions, final long startingWindowId,
+ final int bufferSize)
{
byte[] array = new byte[4096];
int offset = 0;
@@ -263,10 +246,7 @@ public class SubscribeRequestTuple extends RequestTuple
array[offset++] = MessageType.SUBSCRIBER_REQUEST_VALUE;
/* write the version */
- if (version == null) {
- version = CLASSIC_VERSION;
- }
- offset = Tuple.writeString(version, array, offset);
+ offset = Tuple.writeString(version == null ? CLASSIC_VERSION : version, array, offset);
/* write the identifier */
offset = Tuple.writeString(id, array, offset);
@@ -288,8 +268,7 @@ public class SubscribeRequestTuple extends RequestTuple
/* write the partitions */
if (partitions == null || partitions.isEmpty()) {
offset = VarInt.write(0, array, offset);
- }
- else {
+ } else {
offset = VarInt.write(partitions.size(), array, offset);
offset = VarInt.write(mask, array, offset);
for (int i : partitions) {
@@ -306,7 +285,11 @@ public class SubscribeRequestTuple extends RequestTuple
@Override
public String toString()
{
- return "SubscribeRequestTuple{" + "version=" + version + ", identifier=" + identifier + ", windowId=" + Codec.getStringWindowId((long)baseSeconds | windowId) + ", type=" + streamType + ", upstreamIdentifier=" + upstreamIdentifier + ", mask=" + mask + ", partitions=" + (partitions == null ? "null" : Arrays.toString(partitions)) + ", bufferSize=" + bufferSize + '}';
+ return "SubscribeRequestTuple{" + "version=" + version + ", identifier=" + identifier +
+ ", windowId=" + Codec.getStringWindowId((long)baseSeconds | windowId) + ", type=" + streamType +
+ ", upstreamIdentifier=" + upstreamIdentifier + ", mask=" + mask +
+ ", partitions=" + (partitions == null ? "null" : Arrays.toString(partitions)) +
+ ", bufferSize=" + bufferSize + '}';
}
private static final Logger logger = LoggerFactory.getLogger(SubscribeRequestTuple.class);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
index 1c45cb2..408e8a2 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
@@ -126,33 +126,28 @@ public abstract class Tuple
byte tmp = buffer[offset++];
if (tmp >= 0) {
return tmp;
- }
- else if (offset < limit) {
+ } else if (offset < limit) {
int integer = tmp & 0x7f;
tmp = buffer[offset++];
if (tmp >= 0) {
return integer | tmp << 7;
- }
- else if (offset < limit) {
+ } else if (offset < limit) {
integer |= (tmp & 0x7f) << 7;
tmp = buffer[offset++];
if (tmp >= 0) {
return integer | tmp << 14;
- }
- else if (offset < limit) {
+ } else if (offset < limit) {
integer |= (tmp & 0x7f) << 14;
tmp = buffer[offset++];
if (tmp >= 0) {
return integer | tmp << 21;
- }
- else if (offset < limit) {
+ } else if (offset < limit) {
integer |= (tmp & 0x7f) << 21;
tmp = buffer[offset++];
if (tmp >= 0) {
return integer | tmp << 28;
- }
- else {
+ } else {
throw new NumberFormatException("Invalid varint at location " + offset + " => "
+ Arrays.toString(Arrays.copyOfRange(buffer, offset, limit)));
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java
index 07c2f85..014827c 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java
@@ -49,25 +49,25 @@ public class WindowIdTuple extends Tuple
@Override
public int getPartition()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public Slice getData()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int getBaseSeconds()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int getWindowWidth()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/AbstractPolicy.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/AbstractPolicy.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/AbstractPolicy.java
index 25d3742..77aa56f 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/AbstractPolicy.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/AbstractPolicy.java
@@ -25,21 +25,13 @@ import com.datatorrent.bufferserver.util.SerializedData;
/**
*
- * The base class for specifying partition policies, implements interface {@link com.datatorrent.bufferserver.policy.Policy}<p>
- * <br>
+ * The base class for specifying partition policies
*
* @since 0.3.2
*/
public class AbstractPolicy implements Policy
{
-
- /**
- *
- *
- * @param nodes Set of downstream {@link com.datatorrent.bufferserver.PhysicalNode}s
- * @param data Opaque {@link com.datatorrent.bufferserver.util.SerializedData} to be send
- */
-
+ @Override
public boolean distribute(Set<PhysicalNode> nodes, SerializedData data) throws InterruptedException
{
throw new UnsupportedOperationException("Not supported yet.");
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/GiveAll.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/GiveAll.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/GiveAll.java
index 88825ce..0c2819d 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/GiveAll.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/GiveAll.java
@@ -33,7 +33,7 @@ import com.datatorrent.bufferserver.util.SerializedData;
*/
public class GiveAll extends AbstractPolicy
{
- final static GiveAll instance = new GiveAll();
+ private static final GiveAll instance = new GiveAll();
/**
*
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/LeastBusy.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/LeastBusy.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/LeastBusy.java
index 88145fc..c4143be 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/LeastBusy.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/LeastBusy.java
@@ -25,9 +25,8 @@ import com.datatorrent.bufferserver.util.SerializedData;
/**
*
- * Implements load balancing by sending the tuple to the least busy partition<p>
- * <br>
- * Basic load balancing policy. Extends the base class {@link com.datatorrent.bufferserver.policy.AbstractPolicy}<br>
+ * Implements load balancing by sending the tuple to the least busy partition.
+ * Basic load balancing policy. Extends the base class {@link AbstractPolicy}<br>
*
* @since 0.3.2
*/
@@ -51,20 +50,13 @@ public class LeastBusy extends AbstractPolicy
{
}
- /**
- *
- *
- * @param nodes Set of downstream {@link com.datatorrent.bufferserver.PhysicalNode}s
- * @param data Opaque {@link com.datatorrent.bufferserver.util.SerializedData} to be send
- */
@Override
public boolean distribute(Set<PhysicalNode> nodes, SerializedData data) throws InterruptedException
{
PhysicalNode theOne = null;
for (PhysicalNode node: nodes) {
- if (theOne == null
- || node.getProcessedMessageCount() < theOne.getProcessedMessageCount()) {
+ if (theOne == null || node.getProcessedMessageCount() < theOne.getProcessedMessageCount()) {
theOne = node;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/Policy.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/Policy.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/Policy.java
index 1393667..0080ce0 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/Policy.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/Policy.java
@@ -33,13 +33,13 @@ import com.datatorrent.bufferserver.util.SerializedData;
public interface Policy
{
/**
+ * Distributes {@code data} to the set of {@code nodes}
*
- *
- * @param nodes Set of downstream {@link com.datatorrent.bufferserver.PhysicalNode}s
- * @param data Opaque {@link com.datatorrent.bufferserver.util.SerializedData} to be send
+ * @param nodes Set of downstream {@link PhysicalNode}
+ * @param data Opaque {@link SerializedData} to be send
* @throws InterruptedException
+ * @return {@code true} if successful, otherwise {@code false}
*/
-
- public boolean distribute(Set<PhysicalNode> nodes, SerializedData data) throws InterruptedException;
+ boolean distribute(Set<PhysicalNode> nodes, SerializedData data) throws InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RandomOne.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RandomOne.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RandomOne.java
index aebe450..4f700a6 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RandomOne.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RandomOne.java
@@ -25,9 +25,8 @@ import com.datatorrent.bufferserver.util.SerializedData;
/**
*
- * Randomly distributes tuples to downstream nodes. A random load balancing policy<p>
- * <br>
- * A generic random load balancing policy. Extends the base class {@link com.datatorrent.bufferserver.policy.AbstractPolicy}<br>
+ * Randomly distributes tuples to downstream nodes. A random load balancing policy.
+ * A generic random load balancing policy. Extends the base class {@link AbstractPolicy}
*
* @since 0.3.2
*/
@@ -51,13 +50,6 @@ public class RandomOne extends AbstractPolicy
{
}
- /**
- *
- *
- * @param nodes Set of downstream {@link com.datatorrent.bufferserver.PhysicalNode}s
- * @param data Opaque {@link com.datatorrent.bufferserver.util.SerializedData} to be send
- */
-
@Override
public boolean distribute(Set<PhysicalNode> nodes, SerializedData data) throws InterruptedException
{
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RoundRobin.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RoundRobin.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RoundRobin.java
index 1185815..7291b7d 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RoundRobin.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RoundRobin.java
@@ -25,9 +25,10 @@ import com.datatorrent.bufferserver.util.SerializedData;
/**
*
- * Distributes to downstream nodes in a roundrobin fashion. A round robin load balancing policy<p>
+ * Distributes to downstream nodes in a round robin fashion. A round robin load balancing policy
* <br>
- * A round robin load balaning policy. Does not take into account busy/load of a downstream physical node. Extends the base class {@link com.datatorrent.bufferserver.policy.AbstractPolicy}<br>
+ * A round robin load balaning policy. Does not take into account busy/load of a downstream physical node. Extends
+ * the base class {@link AbstractPolicy}<br>
* <br>
*
* @since 0.3.2
@@ -44,18 +45,15 @@ public class RoundRobin extends AbstractPolicy
index = 0;
}
- /**
- *
- *
- * @param nodes Set of downstream {@link com.datatorrent.bufferserver.PhysicalNode}s
- * @param data Opaque {@link com.datatorrent.bufferserver.util.SerializedData} to be send
- */
-
@Override
public boolean distribute(Set<PhysicalNode> nodes, SerializedData data) throws InterruptedException
{
int size = nodes.size();
- if (size > 0) { // why do i need to do this check? synchronization issues? because if there is no one interested, the logical group should not exist!
+ /*
+ * why do i need to do this check? synchronization issues? because if there is no one interested,
+ * the logical group should not exist!
+ */
+ if (size > 0) {
index %= size;
int count = index++;
/*
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index c4cdf5b..03d96ee 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -91,7 +91,10 @@ public class Server implements ServerListener
this.blockSize = blocksize;
this.numberOfCacheBlocks = numberOfCacheBlocks;
serverHelperExecutor = Executors.newSingleThreadExecutor(new NameableThreadFactory("ServerHelper"));
- storageHelperExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(numberOfCacheBlocks), new NameableThreadFactory("StorageHelper"), new ThreadPoolExecutor.CallerRunsPolicy());
+ final ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(numberOfCacheBlocks);
+ final NameableThreadFactory threadFactory = new NameableThreadFactory("StorageHelper");
+ storageHelperExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory,
+ new ThreadPoolExecutor.CallerRunsPolicy());
}
public void setSpoolStorage(Storage storage)
@@ -168,8 +171,8 @@ public class Server implements ServerListener
private final HashMap<String, DataList> publisherBuffers = new HashMap<String, DataList>();
private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<String, LogicalNode>();
- private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels = new ConcurrentHashMap<String, AbstractLengthPrependerClient>();
- private final ConcurrentHashMap<String, AbstractLengthPrependerClient> subscriberChannels = new ConcurrentHashMap<String, AbstractLengthPrependerClient>();
+ private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, AbstractLengthPrependerClient> subscriberChannels = new ConcurrentHashMap<>();
private final int blockSize;
private final int numberOfCacheBlocks;
@@ -251,7 +254,9 @@ public class Server implements ServerListener
dl = publisherBuffers.get(upstream_identifier);
//logger.debug("old list = {}", dl);
} else {
- dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) : new DataList(upstream_identifier, blockSize, numberOfCacheBlocks);
+ dl = Tuple.FAST_VERSION.equals(request.getVersion()) ?
+ new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) :
+ new DataList(upstream_identifier, blockSize, numberOfCacheBlocks);
publisherBuffers.put(upstream_identifier, dl);
//logger.debug("new list = {}", dl);
}
@@ -305,7 +310,9 @@ public class Server implements ServerListener
throw new RuntimeException(ie);
}
} else {
- dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(identifier, blockSize, numberOfCacheBlocks) : new DataList(identifier, blockSize, numberOfCacheBlocks);
+ dl = Tuple.FAST_VERSION.equals(request.getVersion()) ?
+ new FastDataList(identifier, blockSize, numberOfCacheBlocks) :
+ new DataList(identifier, blockSize, numberOfCacheBlocks);
publisherBuffers.put(identifier, dl);
}
dl.setSecondaryStorage(storage, storageHelperExecutor);
@@ -439,9 +446,11 @@ public class Server implements ServerListener
// bufferSize = 16 * 1024;
// }
if (subscriberRequest.getVersion().equals(Tuple.FAST_VERSION)) {
- subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), subscriberRequest.getPartitions(), bufferSize);
+ subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(),
+ subscriberRequest.getPartitions(), bufferSize);
} else {
- subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), subscriberRequest.getPartitions(), bufferSize)
+ subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(),
+ subscriberRequest.getPartitions(), bufferSize)
{
@Override
public int readSize()
@@ -515,7 +524,8 @@ public class Server implements ServerListener
@Override
public void onMessage(byte[] buffer, int offset, int size)
{
- logger.warn("Received data when no data is expected: {}", Arrays.toString(Arrays.copyOfRange(buffer, offset, offset + size)));
+ logger.warn("Received data when no data is expected: {}",
+ Arrays.toString(Arrays.copyOfRange(buffer, offset, offset + size)));
}
@Override
@@ -535,7 +545,8 @@ public class Server implements ServerListener
@Override
public String toString()
{
- return "Server.Subscriber{" + "type=" + type + ", mask=" + mask + ", partitions=" + (partitions == null ? "null" : Arrays.toString(partitions)) + '}';
+ return "Server.Subscriber{" + "type=" + type + ", mask=" + mask +
+ ", partitions=" + (partitions == null ? "null" : Arrays.toString(partitions)) + '}';
}
private volatile boolean torndown;
@@ -600,10 +611,12 @@ public class Server implements ServerListener
}
/**
- * Schedules a task to conditionally resume I/O channel read operations. No-op if {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ}
- * is already set in the key {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}. Otherwise, calls {@linkplain #read(int) read(0)}
- * to process data left in the Publisher read buffer and registers {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ} in the key
- * {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}.
+ * Schedules a task to conditionally resume I/O channel read operations.
+ * No-op if {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ}
+ * is already set in the key {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}.
+ * Otherwise, calls {@linkplain #read(int) read(0)} to process data
+ * left in the Publisher read buffer and registers {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ}
+ * in the key {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}.
* @return true
*/
@Override
@@ -770,8 +783,9 @@ public class Server implements ServerListener
*/
/**
- * since the publisher server died, the queue which it was using would stop pumping the data unless a new publisher comes up with the same name. We leave
- * it to the stream to decide when to bring up a new node with the same identifier as the one which just died.
+ * since the publisher server died, the queue which it was using would stop pumping the data unless
+ * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node
+ * with the same identifier as the one which just died.
*/
if (publisherChannels.containsValue(this)) {
final Iterator<Entry<String, AbstractLengthPrependerClient>> i = publisherChannels.entrySet().iterator();
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/DiskStorage.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/DiskStorage.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/DiskStorage.java
index 3ddc77f..02ba340 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/DiskStorage.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/DiskStorage.java
@@ -66,8 +66,7 @@ public class DiskStorage implements Storage
Character c = name.charAt(i);
if (Character.isLetterOrDigit(c)) {
sb.append(c);
- }
- else {
+ } else {
sb.append('-');
}
}
@@ -90,30 +89,25 @@ public class DiskStorage implements Storage
synchronized (this) {
lUniqueIdentifier = ++this.uniqueIdentifier;
}
+ } else {
+ throw new IllegalStateException("Collision in identifier name, please ensure that the slug for " +
+ "the identifiers is different");
}
- else {
- throw new IllegalStateException("Collission in identifier name, please ensure that the slug for the identifiers is differents");
- }
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
- }
- else {
+ } else {
throw new IllegalStateException("Identity file is hijacked!");
}
- }
- else {
+ } else {
if (directory.mkdir()) {
File identity = new File(directory, "identity");
try {
Files.write(identifier.getBytes(), identity);
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
- }
- else {
+ } else {
throw new RuntimeException("directory " + directory.getAbsolutePath() + " could not be created!");
}
@@ -122,8 +116,7 @@ public class DiskStorage implements Storage
try {
return writeFile(bytes, startingOffset, endingOffset, directory, lUniqueIdentifier);
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
}
@@ -144,24 +137,20 @@ public class DiskStorage implements Storage
if (!deletionFile.delete()) {
throw new RuntimeException("File " + deletionFile.getPath() + " could not be deleted!");
}
- }
- else {
+ } else {
throw new RuntimeException("File " + deletionFile.getPath() + " either is non existent or not a file!");
}
+ } else {
+ throw new RuntimeException("Collision in the identifier name, please ensure that the slugs for " +
+ "the identifiers are different");
}
- else {
- throw new RuntimeException("Collission in identifier name, please ensure that the slug for the identifiers is differents");
- }
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
- }
- else {
+ } else {
throw new RuntimeException(identityFile + " is not a file!");
}
- }
- else {
+ } else {
throw new RuntimeException("directory " + directory.getPath() + " does not exist!");
}
}
@@ -180,37 +169,31 @@ public class DiskStorage implements Storage
File filename = new File(directory, String.valueOf(uniqueIdentifier));
if (filename.exists() && filename.isFile()) {
return Files.toByteArray(filename);
- }
- else {
+ } else {
throw new RuntimeException("File " + filename.getPath() + " either is non existent or not a file!");
}
+ } else {
+ throw new RuntimeException("Collision in the identifier name," +
+ " please ensure that the slugs for the identifiers [" + identifier + "], and [" + new String(stored) +
+ "] are different.");
}
- else {
- throw new RuntimeException("Collision in identifier name, please ensure that the slugs for the identifiers [" + identifier + "], and [" + new String(stored) + "] are different.");
- }
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
- }
- else {
+ } else {
throw new RuntimeException(identityFile + " is not a file!");
}
- }
- else {
+ } else {
throw new RuntimeException("directory " + directory.getPath() + " does not exist!");
}
}
- protected int writeFile(byte[] bytes, int startingOffset, int endingOffset, File directory, final int number) throws IOException
+ protected int writeFile(final byte[] bytes, final int startingOffset, final int endingOffset, final File directory,
+ final int number) throws IOException
{
- FileOutputStream stream = new FileOutputStream(new File(directory, String.valueOf(number)));
- try {
+ try (FileOutputStream stream = new FileOutputStream(new File(directory, String.valueOf(number)))) {
stream.write(bytes, startingOffset, endingOffset - startingOffset);
}
- finally {
- stream.close();
- }
return number;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/util/SerializedData.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/SerializedData.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/SerializedData.java
index 16f443f..0bc065e 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/SerializedData.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/SerializedData.java
@@ -21,8 +21,8 @@ package com.datatorrent.bufferserver.util;
import com.datatorrent.netlet.util.Slice;
/**
- * Wrapper for a {@code byte[]}, which provides read-only access and can "reveal" a partial slice of the underlying array.<p>
- *
+ * Wrapper for a {@code byte[]}, which provides read-only access and can "reveal" a partial slice of the underlying
+ * array.<p>
*
* <b>Note:</b> Multibyte accessors all use big-endian order.
*
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
index 0b2b67a..124cc5f 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
@@ -18,12 +18,12 @@
*/
package com.datatorrent.bufferserver.util;
-import com.datatorrent.netlet.DefaultEventLoop;
-import com.datatorrent.netlet.EventLoop;
-
import java.io.IOException;
import java.util.HashMap;
+import com.datatorrent.netlet.DefaultEventLoop;
+import com.datatorrent.netlet.EventLoop;
+
/**
* <p>System class.</p>
*
@@ -40,8 +40,7 @@ public class System
if (el == null) {
try {
eventloops.put(identifier, el = DefaultEventLoop.createEventLoop(identifier));
- }
- catch (IOException io) {
+ } catch (IOException io) {
throw new RuntimeException(io);
}
}
@@ -55,8 +54,7 @@ public class System
DefaultEventLoop el = eventloops.get(identifier);
if (el == null) {
throw new RuntimeException("System with " + identifier + " not setup!");
- }
- else {
+ } else {
el.stop();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/util/VarInt.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/VarInt.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/VarInt.java
index 6f12cc4..d8583fb 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/VarInt.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/VarInt.java
@@ -38,18 +38,15 @@ public class VarInt extends com.datatorrent.netlet.util.VarInt
int result = tmp & 0x7f;
if ((tmp = data[offset++]) >= 0) {
result |= tmp << 7;
- }
- else {
+ } else {
result |= (tmp & 0x7f) << 7;
if ((tmp = data[offset++]) >= 0) {
result |= tmp << 14;
- }
- else {
+ } else {
result |= (tmp & 0x7f) << 14;
if ((tmp = data[offset++]) >= 0) {
result |= tmp << 21;
- }
- else {
+ } else {
result |= (tmp & 0x7f) << 21;
result |= (tmp = data[offset++]) << 28;
if (tmp < 0) {
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
index ee56e4d..234fb12 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.Assert;
+
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -40,6 +40,9 @@ import com.datatorrent.bufferserver.support.Subscriber;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.netlet.DefaultEventLoop;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
/**
*
*/
@@ -57,8 +60,7 @@ public class SubscriberTest
try {
eventloopServer = DefaultEventLoop.createEventLoop("server");
eventloopClient = DefaultEventLoop.createEventLoop("client");
- }
- catch (IOException ioe) {
+ } catch (IOException ioe) {
throw new RuntimeException(ioe);
}
eventloopServer.start();
@@ -66,7 +68,8 @@ public class SubscriberTest
instance = new Server(0, 64, 2);
address = instance.run(eventloopServer);
- assert (address instanceof InetSocketAddress);
+ assertTrue(address instanceof InetSocketAddress);
+ assertFalse(address.isUnresolved());
}
@AfterClass
@@ -82,7 +85,7 @@ public class SubscriberTest
public void test() throws InterruptedException
{
final Publisher bsp1 = new Publisher("MyPublisher");
- eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp1);
+ eventloopClient.connect(address, bsp1);
final Subscriber bss1 = new Subscriber("MySubscriber")
{
@@ -104,7 +107,7 @@ public class SubscriberTest
}
};
- eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss1);
+ eventloopClient.connect(address, bss1);
final int baseWindow = 0x7afebabe;
bsp1.activate(null, baseWindow, 0);
@@ -131,12 +134,9 @@ public class SubscriberTest
windowId++;
Thread.sleep(5);
}
- }
- catch (InterruptedException ex) {
- }
- catch (CancelledKeyException cke) {
- }
- finally {
+ } catch (InterruptedException | CancelledKeyException e) {
+ logger.debug("{}", e);
+ } finally {
logger.debug("publisher the middle of window = {}", Codec.getStringWindowId(windowId));
}
}
@@ -158,7 +158,7 @@ public class SubscriberTest
* subscribe from 8 onwards. What we should see is that subscriber gets the new data from 8 onwards.
*/
final Publisher bsp2 = new Publisher("MyPublisher");
- eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp2);
+ eventloopClient.connect(address, bsp2);
bsp2.activate(null, 0x7afebabe, 5);
final Subscriber bss2 = new Subscriber("MyPublisher")
@@ -175,7 +175,7 @@ public class SubscriberTest
}
};
- eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss2);
+ eventloopClient.connect(address, bss2);
bss2.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0x7afebabe00000008L, 0);
@@ -200,12 +200,9 @@ public class SubscriberTest
windowId++;
Thread.sleep(5);
}
- }
- catch (InterruptedException ex) {
- }
- catch (CancelledKeyException cke) {
- }
- finally {
+ } catch (InterruptedException | CancelledKeyException e) {
+ logger.debug("", e);
+ } finally {
logger.debug("publisher in the middle of window = {}", Codec.getStringWindowId(windowId));
}
}
@@ -221,7 +218,7 @@ public class SubscriberTest
eventloopClient.disconnect(bsp2);
eventloopClient.disconnect(bss2);
- Assert.assertTrue((bss2.lastPayload.getWindowId() - 8) * 3 <= bss2.tupleCount.get());
+ assertTrue((bss2.lastPayload.getWindowId() - 8) * 3 <= bss2.tupleCount.get());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/NoMessageTupleTest.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/NoMessageTupleTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/NoMessageTupleTest.java
index 5dc581c..04767ef 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/NoMessageTupleTest.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/NoMessageTupleTest.java
@@ -18,34 +18,17 @@
*/
package com.datatorrent.bufferserver.packet;
-import junit.framework.TestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.testng.annotations.Test;
/**
*
*/
-public class NoMessageTupleTest extends TestCase
+public class NoMessageTupleTest
{
- public NoMessageTupleTest(String testName)
- {
- super(testName);
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
@Test
public void testSerDe()
{
@@ -54,7 +37,7 @@ public class NoMessageTupleTest extends TestCase
byte[] serialized = NoMessageTuple.getSerializedTuple();
Tuple t = Tuple.getTuple(serialized, 0, serialized.length);
- assert(t.getType() == MessageType.NO_MESSAGE);
+ assert t.getType() == MessageType.NO_MESSAGE;
}
private static final Logger logger = LoggerFactory.getLogger(NoMessageTupleTest.class);