You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/12/02 05:58:26 UTC

[2/3] incubator-apex-core git commit: APEX-273 - Fix existing checkstyle violations in bufferserver module

APEX-273 - Fix existing checkstyle violations in bufferserver module


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/799df6c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/799df6c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/799df6c0

Branch: refs/heads/devel-3
Commit: 799df6c0aba94c20bef0eb2d975d3f5649b02f54
Parents: 892355c
Author: MalharJenkins <je...@datatorrent.com>
Authored: Thu Nov 19 15:44:13 2015 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Fri Nov 20 18:31:15 2015 -0800

----------------------------------------------------------------------
 bufferserver/pom.xml                            |  3 +-
 .../bufferserver/auth/AuthManager.java          |  2 +-
 .../bufferserver/client/AuthClient.java         |  6 +-
 .../bufferserver/client/Controller.java         |  1 -
 .../bufferserver/client/Subscriber.java         | 15 +---
 .../bufferserver/internal/DataList.java         | 90 ++++++++++++--------
 .../bufferserver/internal/DataListener.java     | 10 +--
 .../bufferserver/internal/FastDataList.java     | 12 +--
 .../bufferserver/internal/LogicalNode.java      | 29 ++++---
 .../bufferserver/internal/PhysicalNode.java     |  3 +-
 .../bufferserver/packet/DataTuple.java          |  8 +-
 .../bufferserver/packet/EmptyTuple.java         | 10 +--
 .../packet/GenericRequestTuple.java             | 20 ++---
 .../bufferserver/packet/MessageType.java        | 17 +++-
 .../bufferserver/packet/PayloadTuple.java       |  6 +-
 .../packet/PublishRequestTuple.java             |  5 +-
 .../bufferserver/packet/RequestTuple.java       |  6 +-
 .../bufferserver/packet/ResetRequestTuple.java  |  5 +-
 .../bufferserver/packet/ResetWindowTuple.java   |  6 +-
 .../packet/SubscribeRequestTuple.java           | 59 +++++--------
 .../datatorrent/bufferserver/packet/Tuple.java  | 15 ++--
 .../bufferserver/packet/WindowIdTuple.java      |  8 +-
 .../bufferserver/policy/AbstractPolicy.java     | 12 +--
 .../bufferserver/policy/GiveAll.java            |  2 +-
 .../bufferserver/policy/LeastBusy.java          | 14 +--
 .../datatorrent/bufferserver/policy/Policy.java | 10 +--
 .../bufferserver/policy/RandomOne.java          | 12 +--
 .../bufferserver/policy/RoundRobin.java         | 18 ++--
 .../datatorrent/bufferserver/server/Server.java | 44 ++++++----
 .../bufferserver/storage/DiskStorage.java       | 73 ++++++----------
 .../bufferserver/util/SerializedData.java       |  4 +-
 .../datatorrent/bufferserver/util/System.java   | 12 ++-
 .../datatorrent/bufferserver/util/VarInt.java   |  9 +-
 .../bufferserver/client/SubscriberTest.java     | 39 ++++-----
 .../bufferserver/packet/NoMessageTupleTest.java | 23 +----
 .../packet/PublishRequestTupleTest.java         | 10 +--
 .../packet/ResetWindowTupleTest.java            | 10 +--
 .../packet/SubscribeRequestTupleTest.java       | 20 +++--
 .../bufferserver/server/ServerTest.java         | 67 ++++++++-------
 .../bufferserver/storage/DiskStorageTest.java   | 11 +--
 .../bufferserver/support/Subscriber.java        |  9 +-
 .../bufferserver/util/CodecTest.java            |  4 +-
 42 files changed, 346 insertions(+), 393 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/pom.xml
----------------------------------------------------------------------
diff --git a/bufferserver/pom.xml b/bufferserver/pom.xml
index f6fc8b3..efc5b5e 100644
--- a/bufferserver/pom.xml
+++ b/bufferserver/pom.xml
@@ -48,10 +48,9 @@
       </plugin>
       -->
       <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
         <configuration>
-          <maxAllowedViolations>228</maxAllowedViolations>
+          <consoleOutput>true</consoleOutput>
         </configuration>
       </plugin>
     </plugins>

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java
index 453befa..942a896 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java
@@ -27,7 +27,7 @@ import java.security.SecureRandom;
  */
 public class AuthManager
 {
-  private final static int BUFFER_SERVER_TOKEN_LENGTH = 20;
+  private static final int BUFFER_SERVER_TOKEN_LENGTH = 20;
 
   private static SecureRandom generator = new SecureRandom();
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java
index fc105b2..4465143 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java
@@ -45,7 +45,8 @@ public abstract class AuthClient extends AbstractLengthPrependerClient
     super(readbuffer, position, sendBufferSize);
   }
 
-  protected void sendAuthenticate() {
+  protected void sendAuthenticate()
+  {
     if (token != null) {
       write(token);
     }
@@ -65,7 +66,8 @@ public abstract class AuthClient extends AbstractLengthPrependerClient
         }
       }
       if (!authenticated) {
-        throw new AccessControlException("Buffer server security is enabled. Access is restricted without proper credentials.");
+        throw new AccessControlException("Buffer server security is enabled." +
+            " Access is restricted without proper credentials.");
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java
index d2faf69..0c3bac9 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java
@@ -26,7 +26,6 @@ import com.datatorrent.bufferserver.packet.PurgeRequestTuple;
 import com.datatorrent.bufferserver.packet.ResetRequestTuple;
 import com.datatorrent.bufferserver.packet.Tuple;
 import com.datatorrent.bufferserver.util.Codec;
-import com.datatorrent.netlet.AbstractLengthPrependerClient;
 import com.datatorrent.netlet.util.Slice;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java
index df91f0b..2b18d04 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java
@@ -23,7 +23,7 @@ import java.util.Collection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.bufferserver.packet.SubscribeRequestTuple;
+import static com.datatorrent.bufferserver.packet.SubscribeRequestTuple.getSerializedRequest;
 
 /**
  *
@@ -46,18 +46,11 @@ public abstract class Subscriber extends AuthClient
     this.id = id;
   }
 
-  public void activate(String version, String type, String sourceId, int mask, Collection<Integer> partitions, long windowId, int bufferSize)
+  public void activate(final String version, final String type, final String sourceId, final int mask,
+      final Collection<Integer> partitions, final long windowId, final int bufferSize)
   {
     sendAuthenticate();
-    write(SubscribeRequestTuple.getSerializedRequest(
-            version,
-            id,
-            type,
-            sourceId,
-            mask,
-            partitions,
-            windowId,
-            bufferSize));
+    write(getSerializedRequest(version, id, type, sourceId, mask, partitions, windowId, bufferSize));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index 1f6c273..fa20aa2 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -90,8 +90,8 @@ public class DataList
   public DataList(String identifier)
   {
     /*
-     * We use 64MB (the default HDFS block getSize) as the getSize of the memory pool so we can flush the data 1 block at a time to the filesystem.
-     * we will use default value of 8 block sizes to be cached in memory
+     * We use 64MB (the default HDFS block getSize) as the getSize of the memory pool so we can flush the data 1 block
+     * at a time to the filesystem. We will use default value of 8 block sizes to be cached in memory
      */
     this(identifier, 64 * 1024 * 1024, 8);
   }
@@ -104,7 +104,8 @@ public class DataList
   public void rewind(final int baseSeconds, final int windowId) throws IOException
   {
     final long longWindowId = (long)baseSeconds << 32 | windowId;
-    logger.debug("Rewinding {} from window ID {} to window ID {}", this, Codec.getStringWindowId(last.ending_window), Codec.getStringWindowId(longWindowId));
+    logger.debug("Rewinding {} from window ID {} to window ID {}", this, Codec.getStringWindowId(last.ending_window),
+        Codec.getStringWindowId(longWindowId));
 
     int numberOfInMemBlockRewound = 0;
     synchronized (this) {
@@ -139,13 +140,16 @@ public class DataList
     }
 
     /*
-      TODO: properly rewind Data List iterators, especially handle case when iterators point to blocks past the last block.
-    */
+     * TODO: properly rewind Data List iterators, especially handle case when iterators point to blocks past the last
+     *  block.
+     */
 
     final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.addAndGet(numberOfInMemBlockRewound);
-    assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
+    assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " +
+        numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
     resumeSuspendedClients(numberOfInMemBlockPermits);
-    logger.debug("Discarded {} in memory blocks during rewind. Number of in memory blocks permits {} after rewinding {}. ", numberOfInMemBlockRewound, numberOfInMemBlockPermits, this);
+    logger.debug("Discarded {} in memory blocks during rewind. Number of in memory blocks permits {} after" +
+        " rewinding {}.", numberOfInMemBlockRewound, numberOfInMemBlockPermits, this);
 
   }
 
@@ -177,11 +181,13 @@ public class DataList
   public void purge(final int baseSeconds, final int windowId)
   {
     final long longWindowId = (long)baseSeconds << 32 | windowId;
-    logger.debug("Purging {} from window ID {} to window ID {}", this, Codec.getStringWindowId(first.starting_window), Codec.getStringWindowId(longWindowId));
+    logger.debug("Purging {} from window ID {} to window ID {}", this, Codec.getStringWindowId(first.starting_window),
+        Codec.getStringWindowId(longWindowId));
 
     int numberOfInMemBlockPurged = 0;
     synchronized (this) {
-      for (Block prev = null, temp = first; temp != null && temp.starting_window <= longWindowId; prev = temp, temp = temp.next) {
+      for (Block prev = null, temp = first; temp != null && temp.starting_window <= longWindowId;
+          prev = temp, temp = temp.next) {
         if (temp.ending_window > longWindowId || temp == last) {
           if (prev != null) {
             first = temp;
@@ -204,9 +210,11 @@ public class DataList
     }
 
     final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.addAndGet(numberOfInMemBlockPurged);
-    assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
+    assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " +
+        numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
     resumeSuspendedClients(numberOfInMemBlockPermits);
-    logger.debug("Discarded {} in memory blocks during purge. Number of in memory blocks permits {} after purging {}. ", numberOfInMemBlockPurged, numberOfInMemBlockPermits, this);
+    logger.debug("Discarded {} in memory blocks during purge. Number of in memory blocks permits {} after purging {}. ",
+        numberOfInMemBlockPurged, numberOfInMemBlockPermits, this);
 
   }
 
@@ -220,7 +228,8 @@ public class DataList
 
   public void flush(final int writeOffset)
   {
-    //logger.debug("size = {}, processingOffset = {}, nextOffset = {}, writeOffset = {}", size, processingOffset, nextOffset.integer, writeOffset);
+    //logger.debug("size = {}, processingOffset = {}, nextOffset = {}, writeOffset = {}", size, processingOffset,
+    //    nextOffset.integer, writeOffset);
     flush:
     do {
       while (size == 0) {
@@ -427,7 +436,8 @@ public class DataList
         suspendedClients.clear();
       }
     } else {
-      logger.debug("Keeping clients: {} suspended, numberOfInMemBlockPermits={}, Listeners: {}", suspendedClients, numberOfInMemBlockPermits, all_listeners);
+      logger.debug("Keeping clients: {} suspended, numberOfInMemBlockPermits={}, Listeners: {}", suspendedClients,
+          numberOfInMemBlockPermits, all_listeners);
     }
     return resumedSuspendedClients;
   }
@@ -608,10 +618,11 @@ public class DataList
       try (DataListIterator dli = getIterator(this)) {
         done:
         while (dli.hasNext()) {
-          SerializedData sd = dli.next();
+          final SerializedData sd = dli.next();
+          final int length = sd.length - sd.dataOffset + sd.offset;
           switch (sd.buffer[sd.dataOffset]) {
             case MessageType.RESET_WINDOW_VALUE:
-              ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
+              final ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, length);
               bs = (long)rwt.getBaseSeconds() << 32;
               if (bs > windowId) {
                 writingOffset = sd.offset;
@@ -620,7 +631,7 @@ public class DataList
               break;
 
             case MessageType.BEGIN_WINDOW_VALUE:
-              BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
+              BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, length);
               if ((bs | bwt.getWindowId()) >= windowId) {
                 writingOffset = sd.offset;
                 break done;
@@ -649,8 +660,9 @@ public class DataList
 
     public void purge(long longWindowId)
     {
-//    logger.debug("starting_window = {}, longWindowId = {}, ending_window = {}",
-//                 new Object[] {VarInt.getStringWindowId(starting_window), VarInt.getStringWindowId(longWindowId), VarInt.getStringWindowId(ending_window)});
+      //logger.debug("starting_window = {}, longWindowId = {}, ending_window = {}",
+      //    VarInt.getStringWindowId(starting_window), VarInt.getStringWindowId(longWindowId),
+      //    VarInt.getStringWindowId(ending_window));
       boolean found = false;
       long bs = starting_window & 0xffffffff00000000L;
       SerializedData lastReset = null;
@@ -659,20 +671,22 @@ public class DataList
         done:
         while (dli.hasNext()) {
           SerializedData sd = dli.next();
+          final int length = sd.length - sd.dataOffset + sd.offset;
           switch (sd.buffer[sd.dataOffset]) {
             case MessageType.RESET_WINDOW_VALUE:
-              ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
+              ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, length);
               bs = (long)rwt.getBaseSeconds() << 32;
               lastReset = sd;
               break;
 
             case MessageType.BEGIN_WINDOW_VALUE:
-              BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
+              BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, length);
               if ((bs | bwt.getWindowId()) > longWindowId) {
                 found = true;
                 if (lastReset != null) {
                   /*
-                   * Restore the last Reset tuple if there was any and adjust the writingOffset to the beginning of the reset tuple.
+                   * Restore the last Reset tuple if there was any and adjust the writingOffset to the beginning of
+                   * the reset tuple.
                    */
                   if (sd.offset >= lastReset.length) {
                     sd.offset -= lastReset.length;
@@ -702,7 +716,8 @@ public class DataList
        * It helps with better utilization of the RAM.
        */
       if (!found) {
-        //logger.debug("we could not find a tuple which is in a window later than the window to be purged, so this has to be the last window published so far");
+        //logger.debug("we could not find a tuple which is in a window later than the window to be purged, " +
+        //    "so this has to be the last window published so far");
         if (lastReset != null && lastReset.offset != 0) {
           this.readingOffset = this.writingOffset - lastReset.length;
           System.arraycopy(lastReset.buffer, lastReset.offset, this.data, this.readingOffset, lastReset.length);
@@ -797,7 +812,8 @@ public class DataList
       }
     }
 
-    private Runnable getStorer(final byte[] data, final int readingOffset, final int writingOffset, final Storage storage)
+    private Runnable getStorer(final byte[] data, final int readingOffset, final int writingOffset,
+        final Storage storage)
     {
       return new Runnable()
       {
@@ -819,7 +835,8 @@ public class DataList
                 logger.debug("Keeping Block {} unchanged", Block.this);
               }
             }
-            assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
+            assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " +
+                numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
             resumeSuspendedClients(numberOfInMemBlockPermits);
           }
         }
@@ -884,11 +901,14 @@ public class DataList
     @Override
     public String toString()
     {
-      return getClass().getName() + '@' + Integer.toHexString(hashCode()) + "{identifier=" + identifier + ", data=" + (data == null ? "null" : data.length)
-             + ", readingOffset=" + readingOffset + ", writingOffset=" + writingOffset
-             + ", starting_window=" + Codec.getStringWindowId(starting_window) + ", ending_window=" + Codec.getStringWindowId(ending_window)
-             + ", refCount=" + refCount.get() + ", uniqueIdentifier=" + uniqueIdentifier + ", next=" + (next == null ? "null" : next.identifier)
-             + ", future=" + (future == null ? "null" : future.isDone() ? "Done" : future.isCancelled() ? "Cancelled" : future) + '}';
+      final String future = this.future == null ? "null" : this.future.isDone() ? "Done" :
+          this.future.isCancelled() ? "Cancelled" : this.future.toString();
+      return getClass().getName() + '@' + Integer.toHexString(hashCode()) + "{identifier=" + identifier +
+          ", data=" + (data == null ? "null" : data.length) + ", readingOffset=" + readingOffset +
+          ", writingOffset=" + writingOffset + ", starting_window=" + Codec.getStringWindowId(starting_window) +
+          ", ending_window=" + Codec.getStringWindowId(ending_window) + ", refCount=" + refCount.get() +
+          ", uniqueIdentifier=" + uniqueIdentifier + ", next=" + (next == null ? "null" : next.identifier) +
+          ", future=" + future + '}';
     }
 
   }
@@ -968,8 +988,10 @@ public class DataList
       if (nextOffset.integer + size <= da.writingOffset) {
         current = new SerializedData(buffer, readOffset, size + nextOffset.integer - readOffset);
         current.dataOffset = nextOffset.integer;
-        //if (buffer[current.dataOffset] == MessageType.BEGIN_WINDOW_VALUE || buffer[current.dataOffset] == MessageType.END_WINDOW_VALUE) {
-        //  Tuple t = Tuple.getTuple(current.buffer, current.dataOffset, current.length - current.dataOffset + current.offset);
+        //final byte messageType = buffer[current.dataOffset];
+        //if (messageType == MessageType.BEGIN_WINDOW_VALUE || messageType == MessageType.END_WINDOW_VALUE) {
+        //  final int length = current.length - current.dataOffset + current.offset;
+        //  Tuple t = Tuple.getTuple(current.buffer, current.dataOffset, length);
         //  logger.debug("next t = {}", t);
         //}
         return true;
@@ -993,9 +1015,9 @@ public class DataList
     }
 
     /**
-     * Removes from the underlying collection the last element returned by the iterator (optional operation). This method can be called only once per call to
-     * next. The behavior of an iterator is unspecified if the underlying collection is modified while the iteration is in progress in any way other than by
-     * calling this method.
+     * Removes from the underlying collection the last element returned by the iterator (optional operation). This
+     * method can be called only once per call to next. The behavior of an iterator is unspecified if the underlying
+     * collection is modified while the iteration is in progress in any way other than by calling this method.
      */
     @Override
     public void remove()

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java
index 4add008..a6a1fab 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java
@@ -18,10 +18,10 @@
  */
 package com.datatorrent.bufferserver.internal;
 
-import com.datatorrent.bufferserver.util.BitVector;
-
 import java.util.Collection;
 
+import com.datatorrent.bufferserver.util.BitVector;
+
 /**
  *
  * Waits for data to be added to the buffer server and then acts on it<p>
@@ -32,17 +32,17 @@ import java.util.Collection;
  */
 public interface DataListener
 {
-  public static final BitVector NULL_PARTITION = new BitVector(0, 0);
+  BitVector NULL_PARTITION = new BitVector(0, 0);
 
   /**
    */
-  public boolean addedData();
+  boolean addedData();
 
   /**
    *
    * @param partitions
    * @return int
    */
-  public int getPartitions(Collection<BitVector> partitions);
+  int getPartitions(Collection<BitVector> partitions);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
index 6ba7b64..af47f23 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
@@ -54,8 +54,7 @@ public class FastDataList extends DataList
           size = last.data[processingOffset];
           size |= (last.data[processingOffset + 1] << 8);
 //          logger.debug("read item = {} of size = {} at offset = {}", item++, size, processingOffset);
-        }
-        else {
+        } else {
           if (writeOffset == last.data.length) {
             processingOffset = 0;
             size = 0;
@@ -73,8 +72,7 @@ public class FastDataList extends DataList
             if (last.starting_window == -1) {
               last.starting_window = baseSeconds | btw.getWindowId();
               last.ending_window = last.starting_window;
-            }
-            else {
+            } else {
               last.ending_window = baseSeconds | btw.getWindowId();
             }
             break;
@@ -83,11 +81,13 @@ public class FastDataList extends DataList
             Tuple rwt = Tuple.getTuple(last.data, processingOffset, size);
             baseSeconds = (long)rwt.getBaseSeconds() << 32;
             break;
+
+          default:
+            break;
         }
         processingOffset += size;
         size = 0;
-      }
-      else {
+      } else {
         if (writeOffset == last.data.length) {
           processingOffset = 0;
           size = 0;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index f867d69..9856829 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -157,7 +157,8 @@ public class LogicalNode implements DataListener
   public void catchUp()
   {
     long lBaseSeconds = (long)iterator.getBaseSeconds() << 32;
-    logger.debug("BaseSeconds = {} and lBaseSeconds = {}", Codec.getStringWindowId(baseSeconds), Codec.getStringWindowId(lBaseSeconds));
+    logger.debug("BaseSeconds = {} and lBaseSeconds = {}", Codec.getStringWindowId(baseSeconds),
+        Codec.getStringWindowId(lBaseSeconds));
     if (lBaseSeconds > baseSeconds) {
       baseSeconds = lBaseSeconds;
     }
@@ -193,13 +194,8 @@ public class LogicalNode implements DataListener
 
             case MessageType.BEGIN_WINDOW_VALUE:
               tuple = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset);
-              logger.debug("{}->{} condition {} =? {}",
-                  new Object[] {
-                      upstream,
-                      group,
-                      Codec.getStringWindowId(baseSeconds | tuple.getWindowId()),
-                      Codec.getStringWindowId(skipWindowId)
-                  });
+              logger.debug("{}->{} condition {} =? {}", upstream, group,
+                  Codec.getStringWindowId(baseSeconds | tuple.getWindowId()), Codec.getStringWindowId(skipWindowId));
               if ((baseSeconds | tuple.getWindowId()) > skipWindowId) {
                 logger.debug("caught up {}->{} skipping {} payload tuples", upstream, group, skippedPayloadTuples);
                 ready = GiveAll.getInstance().distribute(physicalNodes, data);
@@ -212,10 +208,12 @@ public class LogicalNode implements DataListener
             case MessageType.CODEC_STATE_VALUE:
             case MessageType.END_STREAM_VALUE:
               ready = GiveAll.getInstance().distribute(physicalNodes, data);
-              logger.debug("Message {} was distributed to {}", MessageType.valueOf(data.buffer[data.dataOffset]), physicalNodes);
+              logger.debug("Message {} was distributed to {}", MessageType.valueOf(data.buffer[data.dataOffset]),
+                  physicalNodes);
               break;
             default:
-              logger.debug("Message {} was not distributed to {}", MessageType.valueOf(data.buffer[data.dataOffset]), physicalNodes);
+              logger.debug("Message {} was not distributed to {}", MessageType.valueOf(data.buffer[data.dataOffset]),
+                  physicalNodes);
           }
         }
       } catch (InterruptedException ie) {
@@ -252,7 +250,8 @@ public class LogicalNode implements DataListener
                   break;
 
                 case MessageType.RESET_WINDOW_VALUE:
-                  Tuple resetWindow = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset);
+                  final int length = data.length - data.dataOffset + data.offset;
+                  Tuple resetWindow = Tuple.getTuple(data.buffer, data.dataOffset, length);
                   baseSeconds = (long)resetWindow.getBaseSeconds() << 32;
                   ready = GiveAll.getInstance().distribute(physicalNodes, data);
                   break;
@@ -266,9 +265,10 @@ public class LogicalNode implements DataListener
           } else {
             while (ready && iterator.hasNext()) {
               SerializedData data = iterator.next();
+              final int length = data.length - data.dataOffset + data.offset;
               switch (data.buffer[data.dataOffset]) {
                 case MessageType.PAYLOAD_VALUE:
-                  Tuple tuple = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset);
+                  Tuple tuple = Tuple.getTuple(data.buffer, data.dataOffset, length);
                   int value = tuple.getPartition();
                   for (BitVector bv : partitions) {
                     if (bv.matches(value)) {
@@ -283,7 +283,7 @@ public class LogicalNode implements DataListener
                   break;
 
                 case MessageType.RESET_WINDOW_VALUE:
-                  tuple = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset);
+                  tuple = Tuple.getTuple(data.buffer, data.dataOffset, length);
                   baseSeconds = (long)tuple.getBaseSeconds() << 32;
                   ready = GiveAll.getInstance().distribute(physicalNodes, data);
                   break;
@@ -344,7 +344,8 @@ public class LogicalNode implements DataListener
   @Override
   public String toString()
   {
-    return "LogicalNode{" + "upstream=" + upstream + ", group=" + group + ", partitions=" + partitions + ", iterator=" + iterator + '}';
+    return "LogicalNode{" + "upstream=" + upstream + ", group=" + group + ", partitions=" + partitions +
+        ", iterator=" + iterator + '}';
   }
 
   private static final Logger logger = LoggerFactory.getLogger(LogicalNode.class);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
index 880d444..424a51a 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
@@ -79,8 +79,7 @@ public class PhysicalNode
       if (client.write(d.buffer, d.offset, d.length)) {
         return true;
       }
-    }
-    else {
+    } else {
       if (client.send(d.buffer, d.offset, d.length)) {
         return true;
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java
index 3e7f23f..cb1ad5f 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java
@@ -35,13 +35,13 @@ public class DataTuple extends Tuple
   @Override
   public int getWindowId()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   @Override
   public int getPartition()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   @Override
@@ -53,13 +53,13 @@ public class DataTuple extends Tuple
   @Override
   public int getBaseSeconds()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   @Override
   public int getWindowWidth()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   public static byte[] getSerializedTuple(byte type, Slice f)

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java
index f034f04..3c3f184 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java
@@ -41,31 +41,31 @@ public class EmptyTuple extends Tuple
   @Override
   public int getWindowId()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   @Override
   public int getPartition()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   @Override
   public Slice getData()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   @Override
   public int getBaseSeconds()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   @Override
   public int getWindowWidth()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   public static byte[] getSerializedTuple(byte value)

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java
index a815334..ea49077 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java
@@ -25,8 +25,6 @@ import org.slf4j.LoggerFactory;
 
 import com.datatorrent.bufferserver.util.Codec;
 import com.datatorrent.netlet.util.VarInt;
-import static com.datatorrent.bufferserver.packet.Tuple.CLASSIC_VERSION;
-import static com.datatorrent.bufferserver.packet.Tuple.writeString;
 
 /**
  * <p>GenericRequestTuple class.</p>
@@ -70,12 +68,10 @@ public class GenericRequestTuple extends RequestTuple
         }
         version = new String(buffer, dataOffset, idlen);
         dataOffset += idlen;
-      }
-      else if (idlen == 0) {
+      } else if (idlen == 0) {
         version = EMPTY_STRING;
         dataOffset++;
-      }
-      else {
+      } else {
         return;
       }
       /*
@@ -87,12 +83,10 @@ public class GenericRequestTuple extends RequestTuple
         }
         identifier = new String(buffer, dataOffset, idlen);
         dataOffset += idlen;
-      }
-      else if (idlen == 0) {
+      } else if (idlen == 0) {
         identifier = EMPTY_STRING;
         dataOffset++;
-      }
-      else {
+      } else {
         return;
       }
 
@@ -105,8 +99,7 @@ public class GenericRequestTuple extends RequestTuple
       }
 
       valid = true;
-    }
-    catch (NumberFormatException nfe) {
+    } catch (NumberFormatException nfe) {
       logger.warn("Unparseable Tuple", nfe);
     }
   }
@@ -166,7 +159,8 @@ public class GenericRequestTuple extends RequestTuple
   @Override
   public String toString()
   {
-    return getClass().getSimpleName() + "{" + "version=" + version + ", identifier=" + identifier + ", windowId=" + Codec.getStringWindowId((long)baseSeconds | windowId) + '}';
+    return getClass().getSimpleName() + "{" + "version=" + version + ", identifier=" + identifier + ", windowId=" +
+        Codec.getStringWindowId((long)baseSeconds | windowId) + '}';
   }
 
   private static final Logger logger = LoggerFactory.getLogger(GenericRequestTuple.class);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java
index 02102da..3c0ec2c 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java
@@ -25,7 +25,20 @@ package com.datatorrent.bufferserver.packet;
  */
 public enum MessageType
 {
-  NO_MESSAGE(0), PAYLOAD(1), RESET_WINDOW(2), BEGIN_WINDOW(3), END_WINDOW(4), END_STREAM(5), PUBLISHER_REQUEST(6), SUBSCRIBER_REQUEST(7), PURGE_REQUEST(8), RESET_REQUEST(9), CHECKPOINT(10), CODEC_STATE(11), NO_MESSAGE_ODD(127);
+  NO_MESSAGE(0),
+  PAYLOAD(1),
+  RESET_WINDOW(2),
+  BEGIN_WINDOW(3),
+  END_WINDOW(4),
+  END_STREAM(5),
+  PUBLISHER_REQUEST(6),
+  SUBSCRIBER_REQUEST(7),
+  PURGE_REQUEST(8),
+  RESET_REQUEST(9),
+  CHECKPOINT(10),
+  CODEC_STATE(11),
+  NO_MESSAGE_ODD(127);
+
   public static final byte NO_MESSAGE_VALUE = 0;
   public static final byte PAYLOAD_VALUE = 1;
   public static final byte RESET_WINDOW_VALUE = 2;
@@ -81,7 +94,7 @@ public enum MessageType
 
   private final int value;
 
-  private MessageType(int value)
+  MessageType(int value)
   {
     this.value = value;
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java
index e757097..256fc05 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java
@@ -51,7 +51,7 @@ public class PayloadTuple extends Tuple
   @Override
   public int getWindowId()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   @Override
@@ -69,13 +69,13 @@ public class PayloadTuple extends Tuple
   @Override
   public int getBaseSeconds()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   @Override
   public int getWindowWidth()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   public static byte[] getSerializedTuple(int partition, int size)

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java
index bead7f3..6a9ba39 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java
@@ -30,9 +30,10 @@ public class PublishRequestTuple extends GenericRequestTuple
     super(array, offset, len);
   }
 
-  public static byte[] getSerializedRequest(String version, String identifier, long startingWindowId)
+  public static byte[] getSerializedRequest(final String version, final String identifier, final long startingWindowId)
   {
-    return GenericRequestTuple.getSerializedRequest(version, identifier, startingWindowId, MessageType.PUBLISHER_REQUEST_VALUE);
+    return GenericRequestTuple.getSerializedRequest(version, identifier, startingWindowId,
+        MessageType.PUBLISHER_REQUEST_VALUE);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java
index 9fe7859..53505b4 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java
@@ -43,19 +43,19 @@ public abstract class RequestTuple extends Tuple
   @Override
   public int getPartition()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   @Override
   public Slice getData()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   @Override
   public int getWindowWidth()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   public abstract void parse();

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java
index 66fbfcd..17ca585 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java
@@ -30,9 +30,10 @@ public class ResetRequestTuple extends GenericRequestTuple
     super(array, offset, length);
   }
 
-  public static byte[] getSerializedRequest(String version, String identifier, long startingWindowId)
+  public static byte[] getSerializedRequest(final String version, final String identifier, final long startingWindowId)
   {
-    return GenericRequestTuple.getSerializedRequest(version, identifier, startingWindowId, MessageType.RESET_REQUEST_VALUE);
+    return GenericRequestTuple.getSerializedRequest(version, identifier, startingWindowId,
+        MessageType.RESET_REQUEST_VALUE);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java
index abeceb3..6045416 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java
@@ -43,19 +43,19 @@ public class ResetWindowTuple extends Tuple
   @Override
   public int getWindowId()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   @Override
   public int getPartition()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   @Override
   public Slice getData()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java
index 416cee9..b63487b 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java
@@ -61,12 +61,10 @@ public class SubscribeRequestTuple extends RequestTuple
         }
         version = new String(buffer, dataOffset, idlen);
         dataOffset += idlen;
-      }
-      else if (idlen == 0) {
+      } else if (idlen == 0) {
         version = EMPTY_STRING;
         dataOffset++;
-      }
-      else {
+      } else {
         return;
       }
       /*
@@ -78,12 +76,10 @@ public class SubscribeRequestTuple extends RequestTuple
         }
         identifier = new String(buffer, dataOffset, idlen);
         dataOffset += idlen;
-      }
-      else if (idlen == 0) {
+      } else if (idlen == 0) {
         identifier = EMPTY_STRING;
         dataOffset++;
-      }
-      else {
+      } else {
         return;
       }
 
@@ -103,12 +99,10 @@ public class SubscribeRequestTuple extends RequestTuple
         }
         streamType = new String(buffer, dataOffset, idlen);
         dataOffset += idlen;
-      }
-      else if (idlen == 0) {
+      } else if (idlen == 0) {
         streamType = EMPTY_STRING;
         dataOffset++;
-      }
-      else {
+      } else {
         return;
       }
       /*
@@ -120,12 +114,10 @@ public class SubscribeRequestTuple extends RequestTuple
         }
         upstreamIdentifier = new String(buffer, dataOffset, idlen);
         dataOffset += idlen;
-      }
-      else if (idlen == 0) {
+      } else if (idlen == 0) {
         upstreamIdentifier = EMPTY_STRING;
         dataOffset++;
-      }
-      else {
+      } else {
         return;
       }
       /*
@@ -139,8 +131,7 @@ public class SubscribeRequestTuple extends RequestTuple
         if (mask > 0) {
           while (buffer[dataOffset++] < 0) {
           }
-        }
-        else {
+        } else {
           /* mask cannot be zero */
           return;
         }
@@ -149,8 +140,7 @@ public class SubscribeRequestTuple extends RequestTuple
           partitions[i] = readVarInt(dataOffset, limit);
           if (partitions[i] == -1) {
             return;
-          }
-          else {
+          } else {
             while (buffer[dataOffset++] < 0) {
             }
           }
@@ -165,8 +155,7 @@ public class SubscribeRequestTuple extends RequestTuple
       }
 
       valid = true;
-    }
-    catch (NumberFormatException nfe) {
+    } catch (NumberFormatException nfe) {
       logger.warn("Unparseable Tuple", nfe);
     }
   }
@@ -246,15 +235,9 @@ public class SubscribeRequestTuple extends RequestTuple
     return bufferSize;
   }
 
-  public static byte[] getSerializedRequest(
-          String version,
-          String id,
-          String down_type,
-          String upstream_id,
-          int mask,
-          Collection<Integer> partitions,
-          long startingWindowId,
-          int bufferSize)
+  public static byte[] getSerializedRequest(final String version, final String id, final String down_type,
+      final String upstream_id, final int mask, final Collection<Integer> partitions, final long startingWindowId,
+      final int bufferSize)
   {
     byte[] array = new byte[4096];
     int offset = 0;
@@ -263,10 +246,7 @@ public class SubscribeRequestTuple extends RequestTuple
     array[offset++] = MessageType.SUBSCRIBER_REQUEST_VALUE;
 
     /* write the version */
-    if (version == null) {
-      version = CLASSIC_VERSION;
-    }
-    offset = Tuple.writeString(version, array, offset);
+    offset = Tuple.writeString(version == null ? CLASSIC_VERSION : version, array, offset);
 
     /* write the identifier */
     offset = Tuple.writeString(id, array, offset);
@@ -288,8 +268,7 @@ public class SubscribeRequestTuple extends RequestTuple
     /* write the partitions */
     if (partitions == null || partitions.isEmpty()) {
       offset = VarInt.write(0, array, offset);
-    }
-    else {
+    } else {
       offset = VarInt.write(partitions.size(), array, offset);
       offset = VarInt.write(mask, array, offset);
       for (int i : partitions) {
@@ -306,7 +285,11 @@ public class SubscribeRequestTuple extends RequestTuple
   @Override
   public String toString()
   {
-    return "SubscribeRequestTuple{" + "version=" + version + ", identifier=" + identifier + ", windowId=" + Codec.getStringWindowId((long)baseSeconds | windowId) + ", type=" + streamType + ", upstreamIdentifier=" + upstreamIdentifier + ", mask=" + mask + ", partitions=" + (partitions == null ? "null" : Arrays.toString(partitions)) + ", bufferSize=" + bufferSize + '}';
+    return "SubscribeRequestTuple{" + "version=" + version + ", identifier=" + identifier +
+        ", windowId=" + Codec.getStringWindowId((long)baseSeconds | windowId) + ", type=" + streamType +
+        ", upstreamIdentifier=" + upstreamIdentifier + ", mask=" + mask +
+        ", partitions=" + (partitions == null ? "null" : Arrays.toString(partitions)) +
+        ", bufferSize=" + bufferSize + '}';
   }
 
   private static final Logger logger = LoggerFactory.getLogger(SubscribeRequestTuple.class);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
index 1c45cb2..408e8a2 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
@@ -126,33 +126,28 @@ public abstract class Tuple
       byte tmp = buffer[offset++];
       if (tmp >= 0) {
         return tmp;
-      }
-      else if (offset < limit) {
+      } else if (offset < limit) {
         int integer = tmp & 0x7f;
         tmp = buffer[offset++];
         if (tmp >= 0) {
           return integer | tmp << 7;
-        }
-        else if (offset < limit) {
+        } else if (offset < limit) {
           integer |= (tmp & 0x7f) << 7;
           tmp = buffer[offset++];
 
           if (tmp >= 0) {
             return integer | tmp << 14;
-          }
-          else if (offset < limit) {
+          } else if (offset < limit) {
             integer |= (tmp & 0x7f) << 14;
             tmp = buffer[offset++];
             if (tmp >= 0) {
               return integer | tmp << 21;
-            }
-            else if (offset < limit) {
+            } else if (offset < limit) {
               integer |= (tmp & 0x7f) << 21;
               tmp = buffer[offset++];
               if (tmp >= 0) {
                 return integer | tmp << 28;
-              }
-              else {
+              } else {
                 throw new NumberFormatException("Invalid varint at location " + offset + " => "
                         + Arrays.toString(Arrays.copyOfRange(buffer, offset, limit)));
               }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java
index 07c2f85..014827c 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java
@@ -49,25 +49,25 @@ public class WindowIdTuple extends Tuple
   @Override
   public int getPartition()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   @Override
   public Slice getData()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   @Override
   public int getBaseSeconds()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   @Override
   public int getWindowWidth()
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/AbstractPolicy.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/AbstractPolicy.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/AbstractPolicy.java
index 25d3742..77aa56f 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/AbstractPolicy.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/AbstractPolicy.java
@@ -25,21 +25,13 @@ import com.datatorrent.bufferserver.util.SerializedData;
 
 /**
  *
- * The base class for specifying partition policies, implements interface {@link com.datatorrent.bufferserver.policy.Policy}<p>
- * <br>
+ * The base class for specifying partition policies
  *
  * @since 0.3.2
  */
 public class AbstractPolicy implements Policy
 {
-
-  /**
-   *
-   *
-   * @param nodes Set of downstream {@link com.datatorrent.bufferserver.PhysicalNode}s
-   * @param data Opaque {@link com.datatorrent.bufferserver.util.SerializedData} to be send
-   */
-
+  @Override
   public boolean distribute(Set<PhysicalNode> nodes, SerializedData data) throws InterruptedException
   {
     throw new UnsupportedOperationException("Not supported yet.");

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/GiveAll.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/GiveAll.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/GiveAll.java
index 88825ce..0c2819d 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/GiveAll.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/GiveAll.java
@@ -33,7 +33,7 @@ import com.datatorrent.bufferserver.util.SerializedData;
  */
 public class GiveAll extends AbstractPolicy
 {
-  final static GiveAll instance = new GiveAll();
+  private static final GiveAll instance = new GiveAll();
 
   /**
    *

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/LeastBusy.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/LeastBusy.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/LeastBusy.java
index 88145fc..c4143be 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/LeastBusy.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/LeastBusy.java
@@ -25,9 +25,8 @@ import com.datatorrent.bufferserver.util.SerializedData;
 
 /**
  *
- * Implements load balancing by sending the tuple to the least busy partition<p>
- * <br>
- * Basic load balancing policy. Extends the base class {@link com.datatorrent.bufferserver.policy.AbstractPolicy}<br>
+ * Implements load balancing by sending the tuple to the least busy partition.
+ * Basic load balancing policy. Extends the base class {@link AbstractPolicy}<br>
  *
  * @since 0.3.2
  */
@@ -51,20 +50,13 @@ public class LeastBusy extends AbstractPolicy
   {
   }
 
-  /**
-   *
-   *
-   * @param nodes Set of downstream {@link com.datatorrent.bufferserver.PhysicalNode}s
-   * @param data Opaque {@link com.datatorrent.bufferserver.util.SerializedData} to be send
-   */
   @Override
   public boolean distribute(Set<PhysicalNode> nodes, SerializedData data) throws InterruptedException
   {
     PhysicalNode theOne = null;
 
     for (PhysicalNode node: nodes) {
-      if (theOne == null
-              || node.getProcessedMessageCount() < theOne.getProcessedMessageCount()) {
+      if (theOne == null || node.getProcessedMessageCount() < theOne.getProcessedMessageCount()) {
         theOne = node;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/Policy.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/Policy.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/Policy.java
index 1393667..0080ce0 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/Policy.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/Policy.java
@@ -33,13 +33,13 @@ import com.datatorrent.bufferserver.util.SerializedData;
 public interface Policy
 {
   /**
+   * Distributes {@code data} to the set of {@code nodes}
    *
-   *
-   * @param nodes Set of downstream {@link com.datatorrent.bufferserver.PhysicalNode}s
-   * @param data Opaque {@link com.datatorrent.bufferserver.util.SerializedData} to be send
+   * @param nodes Set of downstream {@link PhysicalNode}
+   * @param data Opaque {@link SerializedData} to be send
    * @throws InterruptedException
+   * @return {@code true} if successful, otherwise {@code false}
    */
-
-  public boolean distribute(Set<PhysicalNode> nodes, SerializedData data) throws InterruptedException;
+  boolean distribute(Set<PhysicalNode> nodes, SerializedData data) throws InterruptedException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RandomOne.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RandomOne.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RandomOne.java
index aebe450..4f700a6 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RandomOne.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RandomOne.java
@@ -25,9 +25,8 @@ import com.datatorrent.bufferserver.util.SerializedData;
 
 /**
  *
- * Randomly distributes tuples to downstream nodes. A random load balancing policy<p>
- * <br>
- * A generic random load balancing policy. Extends the base class {@link com.datatorrent.bufferserver.policy.AbstractPolicy}<br>
+ * Randomly distributes tuples to downstream nodes. A random load balancing policy.
+ * A generic random load balancing policy. Extends the base class {@link AbstractPolicy}
  *
  * @since 0.3.2
  */
@@ -51,13 +50,6 @@ public class RandomOne extends AbstractPolicy
   {
   }
 
-  /**
-   *
-   *
-   * @param nodes Set of downstream {@link com.datatorrent.bufferserver.PhysicalNode}s
-   * @param data Opaque {@link com.datatorrent.bufferserver.util.SerializedData} to be send
-   */
-
   @Override
   public boolean distribute(Set<PhysicalNode> nodes, SerializedData data) throws InterruptedException
   {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RoundRobin.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RoundRobin.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RoundRobin.java
index 1185815..7291b7d 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RoundRobin.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RoundRobin.java
@@ -25,9 +25,10 @@ import com.datatorrent.bufferserver.util.SerializedData;
 
 /**
  *
- * Distributes to downstream nodes in a roundrobin fashion. A round robin load balancing policy<p>
+ * Distributes to downstream nodes in a round robin fashion. A round robin load balancing policy
  * <br>
- * A round robin load balaning policy. Does not take into account busy/load of a downstream physical node. Extends the base class {@link com.datatorrent.bufferserver.policy.AbstractPolicy}<br>
+ * A round robin load balaning policy. Does not take into account busy/load of a downstream physical node. Extends
+ * the base class {@link AbstractPolicy}<br>
  * <br>
  *
  * @since 0.3.2
@@ -44,18 +45,15 @@ public class RoundRobin extends AbstractPolicy
     index = 0;
   }
 
-  /**
-   *
-   *
-   * @param nodes Set of downstream {@link com.datatorrent.bufferserver.PhysicalNode}s
-   * @param data Opaque {@link com.datatorrent.bufferserver.util.SerializedData} to be send
-   */
-
   @Override
   public boolean distribute(Set<PhysicalNode> nodes, SerializedData data) throws InterruptedException
   {
     int size = nodes.size();
-    if (size > 0) { // why do i need to do this check? synchronization issues? because if there is no one interested, the logical group should not exist!
+    /*
+     * why do i need to do this check? synchronization issues? because if there is no one interested,
+     * the logical group should not exist!
+     */
+    if (size > 0) {
       index %= size;
       int count = index++;
       /*

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index c4cdf5b..03d96ee 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -91,7 +91,10 @@ public class Server implements ServerListener
     this.blockSize = blocksize;
     this.numberOfCacheBlocks = numberOfCacheBlocks;
     serverHelperExecutor = Executors.newSingleThreadExecutor(new NameableThreadFactory("ServerHelper"));
-    storageHelperExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(numberOfCacheBlocks), new NameableThreadFactory("StorageHelper"), new ThreadPoolExecutor.CallerRunsPolicy());
+    final ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(numberOfCacheBlocks);
+    final NameableThreadFactory threadFactory = new NameableThreadFactory("StorageHelper");
+    storageHelperExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory,
+        new ThreadPoolExecutor.CallerRunsPolicy());
   }
 
   public void setSpoolStorage(Storage storage)
@@ -168,8 +171,8 @@ public class Server implements ServerListener
 
   private final HashMap<String, DataList> publisherBuffers = new HashMap<String, DataList>();
   private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<String, LogicalNode>();
-  private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels = new ConcurrentHashMap<String, AbstractLengthPrependerClient>();
-  private final ConcurrentHashMap<String, AbstractLengthPrependerClient> subscriberChannels = new ConcurrentHashMap<String, AbstractLengthPrependerClient>();
+  private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, AbstractLengthPrependerClient> subscriberChannels = new ConcurrentHashMap<>();
   private final int blockSize;
   private final int numberOfCacheBlocks;
 
@@ -251,7 +254,9 @@ public class Server implements ServerListener
         dl = publisherBuffers.get(upstream_identifier);
         //logger.debug("old list = {}", dl);
       } else {
-        dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) : new DataList(upstream_identifier, blockSize, numberOfCacheBlocks);
+        dl = Tuple.FAST_VERSION.equals(request.getVersion()) ?
+            new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) :
+            new DataList(upstream_identifier, blockSize, numberOfCacheBlocks);
         publisherBuffers.put(upstream_identifier, dl);
         //logger.debug("new list = {}", dl);
       }
@@ -305,7 +310,9 @@ public class Server implements ServerListener
         throw new RuntimeException(ie);
       }
     } else {
-      dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(identifier, blockSize, numberOfCacheBlocks) : new DataList(identifier, blockSize, numberOfCacheBlocks);
+      dl = Tuple.FAST_VERSION.equals(request.getVersion()) ?
+          new FastDataList(identifier, blockSize, numberOfCacheBlocks) :
+          new DataList(identifier, blockSize, numberOfCacheBlocks);
       publisherBuffers.put(identifier, dl);
     }
     dl.setSecondaryStorage(storage, storageHelperExecutor);
@@ -439,9 +446,11 @@ public class Server implements ServerListener
 //            bufferSize = 16 * 1024;
 //          }
           if (subscriberRequest.getVersion().equals(Tuple.FAST_VERSION)) {
-            subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), subscriberRequest.getPartitions(), bufferSize);
+            subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(),
+                subscriberRequest.getPartitions(), bufferSize);
           } else {
-            subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), subscriberRequest.getPartitions(), bufferSize)
+            subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(),
+                subscriberRequest.getPartitions(), bufferSize)
             {
               @Override
               public int readSize()
@@ -515,7 +524,8 @@ public class Server implements ServerListener
     @Override
     public void onMessage(byte[] buffer, int offset, int size)
     {
-      logger.warn("Received data when no data is expected: {}", Arrays.toString(Arrays.copyOfRange(buffer, offset, offset + size)));
+      logger.warn("Received data when no data is expected: {}",
+          Arrays.toString(Arrays.copyOfRange(buffer, offset, offset + size)));
     }
 
     @Override
@@ -535,7 +545,8 @@ public class Server implements ServerListener
     @Override
     public String toString()
     {
-      return "Server.Subscriber{" + "type=" + type + ", mask=" + mask + ", partitions=" + (partitions == null ? "null" : Arrays.toString(partitions)) + '}';
+      return "Server.Subscriber{" + "type=" + type + ", mask=" + mask +
+          ", partitions=" + (partitions == null ? "null" : Arrays.toString(partitions)) + '}';
     }
 
     private volatile boolean torndown;
@@ -600,10 +611,12 @@ public class Server implements ServerListener
     }
 
     /**
-     * Schedules a task to conditionally resume I/O channel read operations. No-op if {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ}
-     * is already set in the key {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}. Otherwise, calls {@linkplain #read(int) read(0)}
-     * to process data left in the Publisher read buffer and registers {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ} in the key
-     * {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}.
+     * Schedules a task to conditionally resume I/O channel read operations.
+     * No-op if {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ}
+     * is already set in the key {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}.
+     * Otherwise, calls {@linkplain #read(int) read(0)} to process data
+     * left in the Publisher read buffer and registers {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ}
+     * in the key {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}.
      * @return true
      */
     @Override
@@ -770,8 +783,9 @@ public class Server implements ServerListener
        */
 
       /**
-       * since the publisher server died, the queue which it was using would stop pumping the data unless a new publisher comes up with the same name. We leave
-       * it to the stream to decide when to bring up a new node with the same identifier as the one which just died.
+       * since the publisher server died, the queue which it was using would stop pumping the data unless
+       * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node
+       * with the same identifier as the one which just died.
        */
       if (publisherChannels.containsValue(this)) {
         final Iterator<Entry<String, AbstractLengthPrependerClient>> i = publisherChannels.entrySet().iterator();

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/DiskStorage.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/DiskStorage.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/DiskStorage.java
index 3ddc77f..02ba340 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/DiskStorage.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/DiskStorage.java
@@ -66,8 +66,7 @@ public class DiskStorage implements Storage
       Character c = name.charAt(i);
       if (Character.isLetterOrDigit(c)) {
         sb.append(c);
-      }
-      else {
+      } else {
         sb.append('-');
       }
     }
@@ -90,30 +89,25 @@ public class DiskStorage implements Storage
             synchronized (this) {
               lUniqueIdentifier = ++this.uniqueIdentifier;
             }
+          } else {
+            throw new IllegalStateException("Collision in identifier name, please ensure that the slug for " +
+                "the identifiers is different");
           }
-          else {
-            throw new IllegalStateException("Collission in identifier name, please ensure that the slug for the identifiers is differents");
-          }
-        }
-        catch (IOException ex) {
+        } catch (IOException ex) {
           throw new RuntimeException(ex);
         }
-      }
-      else {
+      } else {
         throw new IllegalStateException("Identity file is hijacked!");
       }
-    }
-    else {
+    } else {
       if (directory.mkdir()) {
         File identity = new File(directory, "identity");
         try {
           Files.write(identifier.getBytes(), identity);
-        }
-        catch (IOException ex) {
+        } catch (IOException ex) {
           throw new RuntimeException(ex);
         }
-      }
-      else {
+      } else {
         throw new RuntimeException("directory " + directory.getAbsolutePath() + " could not be created!");
       }
 
@@ -122,8 +116,7 @@ public class DiskStorage implements Storage
 
     try {
       return writeFile(bytes, startingOffset, endingOffset, directory, lUniqueIdentifier);
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
   }
@@ -144,24 +137,20 @@ public class DiskStorage implements Storage
               if (!deletionFile.delete()) {
                 throw new RuntimeException("File " + deletionFile.getPath() + " could not be deleted!");
               }
-            }
-            else {
+            } else {
               throw new RuntimeException("File " + deletionFile.getPath() + " either is non existent or not a file!");
             }
+          } else {
+            throw new RuntimeException("Collision in the identifier name, please ensure that the slugs for " +
+                "the identifiers are different");
           }
-          else {
-            throw new RuntimeException("Collission in identifier name, please ensure that the slug for the identifiers is differents");
-          }
-        }
-        catch (IOException ex) {
+        } catch (IOException ex) {
           throw new RuntimeException(ex);
         }
-      }
-      else {
+      } else {
         throw new RuntimeException(identityFile + " is not a file!");
       }
-    }
-    else {
+    } else {
       throw new RuntimeException("directory " + directory.getPath() + " does not exist!");
     }
   }
@@ -180,37 +169,31 @@ public class DiskStorage implements Storage
             File filename = new File(directory, String.valueOf(uniqueIdentifier));
             if (filename.exists() && filename.isFile()) {
               return Files.toByteArray(filename);
-            }
-            else {
+            } else {
               throw new RuntimeException("File " + filename.getPath() + " either is non existent or not a file!");
             }
+          } else {
+            throw new RuntimeException("Collision in the identifier name," +
+                " please ensure that the slugs for the identifiers [" + identifier + "], and [" +  new String(stored) +
+                "] are different.");
           }
-          else {
-            throw new RuntimeException("Collision in identifier name, please ensure that the slugs for the identifiers [" + identifier + "], and [" +  new String(stored) + "] are different.");
-          }
-        }
-        catch (IOException ex) {
+        } catch (IOException ex) {
           throw new RuntimeException(ex);
         }
-      }
-      else {
+      } else {
         throw new RuntimeException(identityFile + " is not a file!");
       }
-    }
-    else {
+    } else {
       throw new RuntimeException("directory " + directory.getPath() + " does not exist!");
     }
   }
 
-  protected int writeFile(byte[] bytes, int startingOffset, int endingOffset, File directory, final int number) throws IOException
+  protected int writeFile(final byte[] bytes, final int startingOffset, final int endingOffset, final File directory,
+      final int number) throws IOException
   {
-    FileOutputStream stream = new FileOutputStream(new File(directory, String.valueOf(number)));
-    try {
+    try (FileOutputStream stream = new FileOutputStream(new File(directory, String.valueOf(number)))) {
       stream.write(bytes, startingOffset, endingOffset - startingOffset);
     }
-    finally {
-      stream.close();
-    }
     return number;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/util/SerializedData.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/SerializedData.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/SerializedData.java
index 16f443f..0bc065e 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/SerializedData.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/SerializedData.java
@@ -21,8 +21,8 @@ package com.datatorrent.bufferserver.util;
 import com.datatorrent.netlet.util.Slice;
 
 /**
- * Wrapper for a {@code byte[]}, which provides read-only access and can "reveal" a partial slice of the underlying array.<p>
- *
+ * Wrapper for a {@code byte[]}, which provides read-only access and can "reveal" a partial slice of the underlying
+ * array.<p>
  *
  * <b>Note:</b> Multibyte accessors all use big-endian order.
  *

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
index 0b2b67a..124cc5f 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
@@ -18,12 +18,12 @@
  */
 package com.datatorrent.bufferserver.util;
 
-import com.datatorrent.netlet.DefaultEventLoop;
-import com.datatorrent.netlet.EventLoop;
-
 import java.io.IOException;
 import java.util.HashMap;
 
+import com.datatorrent.netlet.DefaultEventLoop;
+import com.datatorrent.netlet.EventLoop;
+
 /**
  * <p>System class.</p>
  *
@@ -40,8 +40,7 @@ public class System
       if (el == null) {
         try {
           eventloops.put(identifier, el = DefaultEventLoop.createEventLoop(identifier));
-        }
-        catch (IOException io) {
+        } catch (IOException io) {
           throw new RuntimeException(io);
         }
       }
@@ -55,8 +54,7 @@ public class System
       DefaultEventLoop el = eventloops.get(identifier);
       if (el == null) {
         throw new RuntimeException("System with " + identifier + " not setup!");
-      }
-      else {
+      } else {
         el.stop();
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/util/VarInt.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/VarInt.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/VarInt.java
index 6f12cc4..d8583fb 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/VarInt.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/VarInt.java
@@ -38,18 +38,15 @@ public class VarInt extends com.datatorrent.netlet.util.VarInt
     int result = tmp & 0x7f;
     if ((tmp = data[offset++]) >= 0) {
       result |= tmp << 7;
-    }
-    else {
+    } else {
       result |= (tmp & 0x7f) << 7;
       if ((tmp = data[offset++]) >= 0) {
         result |= tmp << 14;
-      }
-      else {
+      } else {
         result |= (tmp & 0x7f) << 14;
         if ((tmp = data[offset++]) >= 0) {
           result |= tmp << 21;
-        }
-        else {
+        } else {
           result |= (tmp & 0x7f) << 21;
           result |= (tmp = data[offset++]) << 28;
           if (tmp < 0) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
index ee56e4d..234fb12 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.Assert;
+
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -40,6 +40,9 @@ import com.datatorrent.bufferserver.support.Subscriber;
 import com.datatorrent.bufferserver.util.Codec;
 import com.datatorrent.netlet.DefaultEventLoop;
 
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
 /**
  *
  */
@@ -57,8 +60,7 @@ public class SubscriberTest
     try {
       eventloopServer = DefaultEventLoop.createEventLoop("server");
       eventloopClient = DefaultEventLoop.createEventLoop("client");
-    }
-    catch (IOException ioe) {
+    } catch (IOException ioe) {
       throw new RuntimeException(ioe);
     }
     eventloopServer.start();
@@ -66,7 +68,8 @@ public class SubscriberTest
 
     instance = new Server(0, 64, 2);
     address = instance.run(eventloopServer);
-    assert (address instanceof InetSocketAddress);
+    assertTrue(address instanceof InetSocketAddress);
+    assertFalse(address.isUnresolved());
   }
 
   @AfterClass
@@ -82,7 +85,7 @@ public class SubscriberTest
   public void test() throws InterruptedException
   {
     final Publisher bsp1 = new Publisher("MyPublisher");
-    eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp1);
+    eventloopClient.connect(address, bsp1);
 
     final Subscriber bss1 = new Subscriber("MySubscriber")
     {
@@ -104,7 +107,7 @@ public class SubscriberTest
       }
 
     };
-    eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss1);
+    eventloopClient.connect(address, bss1);
 
     final int baseWindow = 0x7afebabe;
     bsp1.activate(null, baseWindow, 0);
@@ -131,12 +134,9 @@ public class SubscriberTest
             windowId++;
             Thread.sleep(5);
           }
-        }
-        catch (InterruptedException ex) {
-        }
-        catch (CancelledKeyException cke) {
-        }
-        finally {
+        } catch (InterruptedException | CancelledKeyException e) {
+          logger.debug("{}", e);
+        } finally {
           logger.debug("publisher the middle of window = {}", Codec.getStringWindowId(windowId));
         }
       }
@@ -158,7 +158,7 @@ public class SubscriberTest
      * subscribe from 8 onwards. What we should see is that subscriber gets the new data from 8 onwards.
      */
     final Publisher bsp2 = new Publisher("MyPublisher");
-    eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp2);
+    eventloopClient.connect(address, bsp2);
     bsp2.activate(null, 0x7afebabe, 5);
 
     final Subscriber bss2 = new Subscriber("MyPublisher")
@@ -175,7 +175,7 @@ public class SubscriberTest
       }
 
     };
-    eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss2);
+    eventloopClient.connect(address, bss2);
     bss2.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0x7afebabe00000008L, 0);
 
 
@@ -200,12 +200,9 @@ public class SubscriberTest
             windowId++;
             Thread.sleep(5);
           }
-        }
-        catch (InterruptedException ex) {
-        }
-        catch (CancelledKeyException cke) {
-        }
-        finally {
+        } catch (InterruptedException | CancelledKeyException e) {
+          logger.debug("", e);
+        } finally {
           logger.debug("publisher in the middle of window = {}", Codec.getStringWindowId(windowId));
         }
       }
@@ -221,7 +218,7 @@ public class SubscriberTest
     eventloopClient.disconnect(bsp2);
     eventloopClient.disconnect(bss2);
 
-    Assert.assertTrue((bss2.lastPayload.getWindowId() - 8) * 3 <= bss2.tupleCount.get());
+    assertTrue((bss2.lastPayload.getWindowId() - 8) * 3 <= bss2.tupleCount.get());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/NoMessageTupleTest.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/NoMessageTupleTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/NoMessageTupleTest.java
index 5dc581c..04767ef 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/NoMessageTupleTest.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/NoMessageTupleTest.java
@@ -18,34 +18,17 @@
  */
 package com.datatorrent.bufferserver.packet;
 
-import junit.framework.TestCase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.testng.annotations.Test;
 
 
 /**
  *
  */
-public class NoMessageTupleTest extends TestCase
+public class NoMessageTupleTest
 {
-  public NoMessageTupleTest(String testName)
-  {
-    super(testName);
-  }
-
-  @Override
-  protected void setUp() throws Exception
-  {
-    super.setUp();
-  }
-
-  @Override
-  protected void tearDown() throws Exception
-  {
-    super.tearDown();
-  }
-
   @Test
   public void testSerDe()
   {
@@ -54,7 +37,7 @@ public class NoMessageTupleTest extends TestCase
     byte[] serialized = NoMessageTuple.getSerializedTuple();
     Tuple t = Tuple.getTuple(serialized, 0, serialized.length);
 
-    assert(t.getType() == MessageType.NO_MESSAGE);
+    assert t.getType() == MessageType.NO_MESSAGE;
   }
 
   private static final Logger logger = LoggerFactory.getLogger(NoMessageTupleTest.class);