You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/01/22 19:08:00 UTC

apex-malhar git commit: fixing all checkstyle violations, deleting maxAllowedViolations config in pom

Repository: apex-malhar
Updated Branches:
  refs/heads/master 4cbbb7507 -> 623b803f5


fixing all checkstyle violations,
deleting maxAllowedViolations config in pom


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/623b803f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/623b803f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/623b803f

Branch: refs/heads/master
Commit: 623b803f5b960b3e33054e2b60e8c156b7b0150a
Parents: 4cbbb75
Author: Apex Dev <de...@apex.apache.org>
Authored: Wed Jan 18 17:14:32 2017 -0800
Committer: Oliver W <ol...@datatorrent.com>
Committed: Fri Jan 20 11:48:58 2017 -0800

----------------------------------------------------------------------
 kafka/pom.xml                                   |  8 ---
 .../kafka/AbstractKafkaInputOperator.java       | 48 +++++++--------
 .../kafka/AbstractKafkaOutputOperator.java      |  4 +-
 .../malhar/kafka/AbstractKafkaPartitioner.java  | 32 +++++-----
 .../apex/malhar/kafka/KafkaConsumerWrapper.java | 13 ++---
 .../apex/malhar/kafka/KafkaPartition.java       |  4 --
 ...afkaSinglePortExactlyOnceOutputOperator.java | 61 ++++++++++----------
 .../kafka/KafkaSinglePortOutputOperator.java    | 11 ++--
 .../apex/malhar/kafka/OneToManyPartitioner.java |  6 +-
 .../apex/malhar/kafka/OneToOnePartitioner.java  |  3 +-
 .../apex/malhar/kafka/PartitionStrategy.java    |  3 +-
 .../apache/apex/malhar/kafka/EmbeddedKafka.java |  7 ++-
 .../kafka/KafkaConsumerPropertiesTest.java      | 10 ++--
 .../apache/apex/malhar/kafka/KafkaHelper.java   |  3 +-
 .../malhar/kafka/KafkaInputOperatorTest.java    | 50 ++++++++--------
 .../malhar/kafka/KafkaOperatorTestBase.java     | 25 ++++----
 .../malhar/kafka/KafkaOutputOperatorTest.java   | 35 ++++++-----
 .../apex/malhar/kafka/KafkaTestPartitioner.java |  8 ++-
 .../apex/malhar/kafka/KafkaTestProducer.java    | 20 ++++---
 19 files changed, 182 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/kafka/pom.xml b/kafka/pom.xml
index 590fa09..654c62c 100755
--- a/kafka/pom.xml
+++ b/kafka/pom.xml
@@ -177,14 +177,6 @@
          </execution>
        </executions>
      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-checkstyle-plugin</artifactId>
-        <configuration>
-          <maxAllowedViolations>35</maxAllowedViolations>
-          <logViolationsToConsole>${checkstyle.console}</logViolationsToConsole>
-        </configuration>
-      </plugin>
      <plugin>
        <artifactId>maven-surefire-plugin</artifactId>
        <configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
index 6fc7693..1d05580 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -65,7 +65,7 @@ import com.datatorrent.netlet.util.DTThrowable;
  *
  * <ol>
  * <li>Out-of-box One-to-one and one-to-many partition strategy support plus customizable partition strategy
- *    refer to AbstractKafkaPartitioner </li>
+ * refer to AbstractKafkaPartitioner </li>
  * <li>Fault-tolerant when the input operator goes down, it redeploys on other node</li>
  * <li>At-least-once semantics for operator failure (no matter which operator fails)</li>
  * <li>At-least-once semantics for cold restart (no data loss even if you restart the application)</li>
@@ -77,7 +77,9 @@ import com.datatorrent.netlet.util.DTThrowable;
  * @since 3.3.0
  */
 @InterfaceStability.Evolving
-public abstract class AbstractKafkaInputOperator implements InputOperator, Operator.ActivationListener<Context.OperatorContext>, Operator.CheckpointNotificationListener, Partitioner<AbstractKafkaInputOperator>, StatsListener, OffsetCommitCallback
+public abstract class AbstractKafkaInputOperator implements InputOperator,
+    Operator.ActivationListener<Context.OperatorContext>, Operator.CheckpointNotificationListener,
+    Partitioner<AbstractKafkaInputOperator>, StatsListener, OffsetCommitCallback
 {
 
   private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaInputOperator.class);
@@ -92,7 +94,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
   {
     EARLIEST, // consume from beginning of the partition every time when application restart
     LATEST, // consume from latest of the partition every time when application restart
-    APPLICATION_OR_EARLIEST, // consume from committed position from last run or earliest if there is no committed offset(s)
+    // consume from committed position from last run or earliest if there is no committed offset(s)
+    APPLICATION_OR_EARLIEST,
     APPLICATION_OR_LATEST // consume from committed position from last run or latest if there is no committed offset(s)
   }
 
@@ -103,7 +106,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
   private String[] topics;
 
   /**
-   *  offset track for checkpoint
+   * offset track for checkpoint
    */
   private final Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetTrack = new HashMap<>();
 
@@ -148,6 +151,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
 
   /**
    * By default the strategy is one to one
+   *
    * @see PartitionStrategy
    */
   private PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE;
@@ -161,7 +165,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
   /**
    * store offsets with window id, only keep offsets with windows that have not been committed
    */
-  private final transient List<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> offsetHistory = new LinkedList<>();
+  private final transient List<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> offsetHistory =
+      new LinkedList<>();
 
   /**
    * Application name is used as group.id for kafka consumer
@@ -212,7 +217,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
       return;
     }
     //ask kafka consumer wrapper to store the committed offsets
-    for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> iter = offsetHistory.iterator(); iter.hasNext(); ) {
+    for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> iter =
+        offsetHistory.iterator(); iter.hasNext(); ) {
       Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item = iter.next();
       if (item.getLeft() <= windowId) {
         if (item.getLeft() == windowId) {
@@ -302,8 +308,6 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
     }
   }
 
-
-
   @Override
   public void setup(Context.OperatorContext context)
   {
@@ -314,7 +318,6 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
     operatorId = context.getId();
   }
 
-
   @Override
   public void teardown()
   {
@@ -382,8 +385,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
   }
 
   /**
-   *
    * A callback from consumer after it commits the offset
+   *
    * @param map
    * @param e
    */
@@ -435,9 +438,9 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
   }
 
   /**
-   *  Same setting as bootstrap.servers property to KafkaConsumer
-   *  refer to http://kafka.apache.org/documentation.html#newconsumerconfigs
-   *  To support multi cluster, you can have multiple bootstrap.servers separated by ";"
+   * Same setting as bootstrap.servers property to KafkaConsumer
+   * refer to http://kafka.apache.org/documentation.html#newconsumerconfigs
+   * To support multi cluster, you can have multiple bootstrap.servers separated by ";"
    */
   public String getClusters()
   {
@@ -474,13 +477,13 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
   }
 
   /**
-   *  Initial offset, it should be one of the following
-   *  <ul>
-   *    <li>earliest</li>
-   *    <li>latest</li>
-   *    <li>application_or_earliest</li>
-   *    <li>application_or_latest</li>
-   *  </ul>
+   * Initial offset, it should be one of the following
+   * <ul>
+   * <li>earliest</li>
+   * <li>latest</li>
+   * <li>application_or_earliest</li>
+   * <li>application_or_latest</li>
+   * </ul>
    */
   public String getInitialOffset()
   {
@@ -512,7 +515,6 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
    * <li>key.deserializer</li>
    * <li>value.deserializer</li>
    * </ul>
-   *
    */
   public Properties getConsumerProps()
   {
@@ -534,7 +536,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
 
   /**
    * @see <a href="http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)">
-   *   org.apache.kafka.clients.consumer.KafkaConsumer.poll</a>
+   * org.apache.kafka.clients.consumer.KafkaConsumer.poll</a>
    */
   public long getConsumerTimeout()
   {
@@ -610,8 +612,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
   }
 
   /**
-   * @omitFromUI
    * @return current checkpointed offsets
+   * @omitFromUI
    */
   public Map<AbstractKafkaPartitioner.PartitionMeta, Long> getOffsetTrack()
   {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
index f38ead9..0e16fe1 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
@@ -20,10 +20,12 @@
 package org.apache.apex.malhar.kafka;
 
 import java.util.Properties;
+
 import javax.validation.constraints.NotNull;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
+
 import com.datatorrent.api.Context;
 import com.datatorrent.api.Operator;
 
@@ -99,7 +101,7 @@ public abstract class AbstractKafkaOutputOperator<K, V> implements Operator
    */
   public void setProperty(Object key, Object val)
   {
-    properties.put(key,val);
+    properties.put(key, val);
   }
 
   public String getTopic()

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index c9b40be..791972f 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -47,8 +47,6 @@ import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.StatsListener;
 import com.datatorrent.lib.util.KryoCloneUtils;
 
-import kafka.common.AuthorizationException;
-
 /**
  * Abstract partitioner used to manage the partitions of kafka input operator.
  * It use a number of kafka consumers(one for each cluster) to get the latest partition metadata for topics that
@@ -74,8 +72,8 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
 
   private ArrayList<KafkaConsumer<byte[], byte[]>> metadataRefreshClients;
 
-
-  private final List<Set<AbstractKafkaPartitioner.PartitionMeta>> currentPartitions = new LinkedList<>(); // prevent null
+  // prevent null
+  private final List<Set<AbstractKafkaPartitioner.PartitionMeta>> currentPartitions = new LinkedList<>();
 
   public AbstractKafkaPartitioner(String[] clusters, String[] topics, AbstractKafkaInputOperator prototypeOperator)
   {
@@ -84,12 +82,11 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
     this.prototypeOperator = prototypeOperator;
   }
 
-  abstract List<Set<PartitionMeta>> assign(Map<String, Map<String,List<PartitionInfo>>> metadata);
-
-
+  abstract List<Set<PartitionMeta>> assign(Map<String, Map<String, List<PartitionInfo>>> metadata);
 
   @Override
-  public Collection<Partition<AbstractKafkaInputOperator>> definePartitions(Collection<Partition<AbstractKafkaInputOperator>> collection, PartitioningContext partitioningContext)
+  public Collection<Partition<AbstractKafkaInputOperator>> definePartitions(
+      Collection<Partition<AbstractKafkaInputOperator>> collection, PartitioningContext partitioningContext)
   {
     initMetadataClients();
 
@@ -127,7 +124,8 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
           } //end while
 
           if (tryTime == 0) {
-            throw new RuntimeException("Get partition info for topic completely failed. Please check the log file. topic name: " + topic);
+            throw new RuntimeException(
+                "Get partition info for topic completely failed. Please check the log file. topic name: " + topic);
           }
         }
       }
@@ -143,7 +141,6 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
       e.printStackTrace();
     }
 
-
     if (currentPartitions == parts || currentPartitions.equals(parts)) {
       logger.debug("No partition change found");
       return collection;
@@ -153,7 +150,7 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
       currentPartitions.addAll(parts);
       int i = 0;
       List<Partition<AbstractKafkaInputOperator>> result = new LinkedList<>();
-      for (Iterator<Partition<AbstractKafkaInputOperator>> iter = collection.iterator(); iter.hasNext();) {
+      for (Iterator<Partition<AbstractKafkaInputOperator>> iter = collection.iterator(); iter.hasNext(); ) {
         Partition<AbstractKafkaInputOperator> nextPartition = iter.next();
         if (parts.remove(nextPartition.getPartitionedInstance().assignment())) {
           if (logger.isInfoEnabled()) {
@@ -186,7 +183,6 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
     metadataRefreshClients = null;
   }
 
-
   @Override
   public void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>> map)
   {
@@ -201,12 +197,15 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
     return response;
   }
 
-  protected Partitioner.Partition<AbstractKafkaInputOperator> createPartition(Set<AbstractKafkaPartitioner.PartitionMeta> partitionAssignment)
+  protected Partitioner.Partition<AbstractKafkaInputOperator> createPartition(
+      Set<AbstractKafkaPartitioner.PartitionMeta> partitionAssignment)
   {
-    Partitioner.Partition<AbstractKafkaInputOperator> p = new DefaultPartition<AbstractKafkaInputOperator>(KryoCloneUtils.cloneObject(prototypeOperator));
+    Partitioner.Partition<AbstractKafkaInputOperator> p =
+        new DefaultPartition<AbstractKafkaInputOperator>(KryoCloneUtils.cloneObject(prototypeOperator));
     p.getPartitionedInstance().assign(partitionAssignment);
     return p;
   }
+
   /**
    *
    */
@@ -243,12 +242,13 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
    * @param prop
    * @return String
    */
-  private String getPropertyAsString(Properties prop) {
+  private String getPropertyAsString(Properties prop)
+  {
     StringWriter writer = new StringWriter();
     try {
       prop.store(writer, "");
     } catch (IOException e) {
-      logger.error("Cannot retrieve consumer properties for Logging : {}", e.getMessage() );
+      logger.error("Cannot retrieve consumer properties for Logging : {}", e.getMessage());
     }
     return writer.getBuffer().toString();
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
index fa4856e..2b3c762 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
@@ -133,7 +133,8 @@ public class KafkaConsumerWrapper implements Closeable
           try {
             kc.position(tp);
           } catch (NoOffsetForPartitionException e) {
-            //the poll() method of a consumer will throw exception if any of subscribed consumers not initialized with position
+            //the poll() method of a consumer will throw exception
+            // if any of subscribed consumers not initialized with position
             handleNoOffsetForPartitionException(e, kc);
           }
           kc.pause(tp);
@@ -145,7 +146,7 @@ public class KafkaConsumerWrapper implements Closeable
       while (windowCount > 0) {
         try {
           ConsumerRecords<byte[], byte[]> records = kc.poll(ownerOperator.getConsumerTimeout());
-          for (Iterator<ConsumerRecord<byte[], byte[]>> cri = records.iterator(); cri.hasNext() && windowCount > 0;) {
+          for (Iterator<ConsumerRecord<byte[], byte[]>> cri = records.iterator(); cri.hasNext() && windowCount > 0; ) {
             ownerOperator.emitTuple(meta.getCluster(), cri.next());
             windowCount--;
           }
@@ -194,7 +195,6 @@ public class KafkaConsumerWrapper implements Closeable
     {
       try {
 
-
         while (wrapper.isAlive.get()) {
           if (wrapper.waitForReplay) {
             Thread.sleep(100);
@@ -229,7 +229,8 @@ public class KafkaConsumerWrapper implements Closeable
     }
   }
 
-  protected void handleNoOffsetForPartitionException(NoOffsetForPartitionException e, KafkaConsumer<byte[], byte[]> consumer)
+  protected void handleNoOffsetForPartitionException(NoOffsetForPartitionException e,
+      KafkaConsumer<byte[], byte[]> consumer)
   {
     // if initialOffset is set to EARLIST or LATEST
     // and the application is run as first time
@@ -260,7 +261,6 @@ public class KafkaConsumerWrapper implements Closeable
     }
   }
 
-
   /**
    * This method is called in the activate method of the operator
    */
@@ -289,7 +289,6 @@ public class KafkaConsumerWrapper implements Closeable
 
     Map<AbstractKafkaPartitioner.PartitionMeta, Long> currentOffset = ownerOperator.getOffsetTrack();
 
-
     //  create one thread for each cluster
     // each thread use one KafkaConsumer to consume from 1+ partition(s) of 1+ topic(s)
     for (Map.Entry<String, List<TopicPartition>> e : consumerAssignment.entrySet()) {
@@ -334,7 +333,6 @@ public class KafkaConsumerWrapper implements Closeable
       kafkaConsumerExecutor.submit(new ConsumerThread(e.getKey(), kc, this));
     }
 
-
   }
 
   /**
@@ -375,7 +373,6 @@ public class KafkaConsumerWrapper implements Closeable
     holdingBuffer.put(msg);
   }
 
-
   @Override
   public void close() throws IOException
   {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
index 1646ffe..a07fe33 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
@@ -18,7 +18,6 @@
  */
 package org.apache.apex.malhar.kafka;
 
-
 import java.io.Serializable;
 
 import org.apache.hadoop.classification.InterfaceStability;
@@ -54,7 +53,6 @@ public class KafkaPartition implements Serializable
    */
   private static final long serialVersionUID = 7556802229202221546L;
 
-
   private String clusterId;
 
   private int partitionId;
@@ -141,6 +139,4 @@ public class KafkaPartition implements Serializable
     return "KafkaPartition [clusterId=" + clusterId + ", partitionId=" + partitionId + ", topic=" + topic + "]";
   }
 
-
-
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
index ff16610..a8e333f 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
@@ -52,47 +52,47 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZE
 
 /**
  * Kafka output operator with exactly once processing semantics.
- *<br>
+ * <br>
  *
- *  <p>
+ * <p>
  * <b>Requirements</b>
  * <li>In the Kafka message, only Value will be available for users</li>
  * <li>Users need to provide Value deserializers for Kafka message as it is used during recovery</li>
- * <li>Value type should have well defined Equals & HashCodes, as during messages are stored in HashMaps for comparison.</li>
+ * <li>Value type should have well defined Equals & HashCodes,
+ * as during messages are stored in HashMaps for comparison.</li>
  * <p>
  * <b>Recovery handling</b>
  * <li> Offsets of the Kafka partitions are stored in the WindowDataManager at the endWindow</li>
  * <li> During recovery,
  * <ul>
- *    <li>Partially written Streaming Window before the crash is constructed. ( Explained below ) </li>
- *    <li>Tuples from the completed Streaming Window's are skipped </li>
- *    <li>Tuples coming for the partially written Streaming Window are skipped.
- *       (No assumption is made on the order and the uniqueness of the tuples) </li>
- *  </ul>
- *  </li>
- *</p>
+ * <li>Partially written Streaming Window before the crash is constructed. ( Explained below ) </li>
+ * <li>Tuples from the completed Streaming Window's are skipped </li>
+ * <li>Tuples coming for the partially written Streaming Window are skipped.
+ * (No assumption is made on the order and the uniqueness of the tuples) </li>
+ * </ul>
+ * </li>
+ * </p>
  *
  * <p>
  * <b>Partial Window Construction</b>
  * <li> Operator uses the Key in the Kafka message, which is not available for use by the operator users.</li>
  * <li> Key is used to uniquely identify the message written by the particular instance of this operator.</li>
- *    This allows multiple writers to same Kafka partitions. Format of the key is "APPLICATTION_ID#OPERATOR_ID".
+ * This allows multiple writers to same Kafka partitions. Format of the key is "APPLICATTION_ID#OPERATOR_ID".
  * <li>During recovery Kafka partitions are read between the latest offset and the last written offsets.</li>
- *<li>All the tuples written by the particular instance is kept in the Map</li>
- *</p>
+ * <li>All the tuples written by the particular instance is kept in the Map</li>
+ * </p>
  *
  * <p>
  * <b>Limitations</b>
  * <li> Key in the Kafka message is reserved for Operator's use </li>
- * <li> During recovery, operator needs to read tuples between 2 offsets, if there are lot of data to be read, Operator may
- *    appear to be blocked to the Stram and can kill the operator. </li>
- *</p>
+ * <li> During recovery, operator needs to read tuples between 2 offsets,
+ * if there are lot of data to be read, Operator may
+ * appear to be blocked to the Stram and can kill the operator. </li>
+ * </p>
  *
  * @displayName Kafka Single Port Exactly Once Output(0.9.0)
  * @category Messaging
  * @tags output operator
- *
- *
  * @since 3.5.0
  */
 @org.apache.hadoop.classification.InterfaceStability.Evolving
@@ -128,7 +128,8 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
     setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
 
     if (getProperties().getProperty(VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
-      throw new IllegalArgumentException("Value deserializer needs to be set for the operator, as it is used during recovery.");
+      throw new IllegalArgumentException(
+          "Value deserializer needs to be set for the operator, as it is used during recovery.");
     }
 
     super.setup(context);
@@ -242,19 +243,19 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
     return false;
   }
 
-  private Map<Integer,Long> getPartitionsAndOffsets(boolean latest) throws ExecutionException, InterruptedException
+  private Map<Integer, Long> getPartitionsAndOffsets(boolean latest) throws ExecutionException, InterruptedException
   {
     List<PartitionInfo> partitionInfoList = consumer.partitionsFor(getTopic());
     List<TopicPartition> topicPartitionList = new java.util.ArrayList<>();
 
-    for ( PartitionInfo partitionInfo: partitionInfoList) {
-      topicPartitionList.add(new TopicPartition(getTopic(), partitionInfo.partition()) );
+    for (PartitionInfo partitionInfo : partitionInfoList) {
+      topicPartitionList.add(new TopicPartition(getTopic(), partitionInfo.partition()));
     }
 
-    Map<Integer,Long> parttionsAndOffset = new HashMap<>();
+    Map<Integer, Long> parttionsAndOffset = new HashMap<>();
     consumer.assign(topicPartitionList);
 
-    for (PartitionInfo partitionInfo: partitionInfoList) {
+    for (PartitionInfo partitionInfo : partitionInfoList) {
       try {
         TopicPartition topicPartition = new TopicPartition(getTopic(), partitionInfo.partition());
         if (latest) {
@@ -275,11 +276,11 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
   {
     logger.info("Rebuild the partial window after " + windowDataManager.getLargestCompletedWindow());
 
-    Map<Integer,Long> storedOffsets;
-    Map<Integer,Long> currentOffsets;
+    Map<Integer, Long> storedOffsets;
+    Map<Integer, Long> currentOffsets;
 
     try {
-      storedOffsets = (Map<Integer,Long>)this.windowDataManager.retrieve(windowId);
+      storedOffsets = (Map<Integer, Long>)this.windowDataManager.retrieve(windowId);
       currentOffsets = getPartitionsAndOffsets(true);
     } catch (IOException | ExecutionException | InterruptedException e) {
       throw new RuntimeException(e);
@@ -303,13 +304,13 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
 
     List<TopicPartition> topicPartitions = new ArrayList<>();
 
-    for (Map.Entry<Integer,Long> entry: currentOffsets.entrySet()) {
+    for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
       topicPartitions.add(new TopicPartition(getTopic(), entry.getKey()));
     }
 
     consumer.assign(topicPartitions);
 
-    for (Map.Entry<Integer,Long> entry: currentOffsets.entrySet()) {
+    for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
       Long storedOffset = 0L;
       Integer currentPartition = entry.getKey();
       Long currentOffset = entry.getValue();
@@ -390,7 +391,7 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
       return;
     }
 
-    getProducer().send(new ProducerRecord<>(getTopic(), key, tuple),new Callback()
+    getProducer().send(new ProducerRecord<>(getTopic(), key, tuple), new Callback()
     {
       public void onCompletion(RecordMetadata metadata, Exception e)
       {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java
index 500602c..c47cf3d 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java
@@ -20,6 +20,7 @@
 package org.apache.apex.malhar.kafka;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
+
 import com.datatorrent.api.DefaultInputPort;
 
 /**
@@ -29,17 +30,17 @@ import com.datatorrent.api.DefaultInputPort;
  * @since 3.5.0
  */
 @org.apache.hadoop.classification.InterfaceStability.Evolving
-public class KafkaSinglePortOutputOperator<K,V> extends AbstractKafkaOutputOperator
+public class KafkaSinglePortOutputOperator<K, V> extends AbstractKafkaOutputOperator
 {
-    /**
-     * This input port receives tuples that will be written out to Kafka.
-     */
+  /**
+   * This input port receives tuples that will be written out to Kafka.
+   */
   public final transient DefaultInputPort<V> inputPort = new DefaultInputPort<V>()
   {
     @Override
     public void process(V tuple)
     {
-      getProducer().send(new ProducerRecord<K,V>(getTopic(),tuple));
+      getProducer().send(new ProducerRecord<K, V>(getTopic(), tuple));
     }
   };
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
index 3b4d3f3..eb0cc40 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
@@ -50,7 +50,8 @@ public class OneToManyPartitioner extends AbstractKafkaPartitioner
     }
 
     int partitionCount = prototypeOperator.getInitialPartitionCount();
-    ArrayList<Set<PartitionMeta>> eachPartitionAssignment = new ArrayList<>(prototypeOperator.getInitialPartitionCount());
+    ArrayList<Set<PartitionMeta>> eachPartitionAssignment =
+        new ArrayList<>(prototypeOperator.getInitialPartitionCount());
     int i = 0;
     for (Map.Entry<String, Map<String, List<PartitionInfo>>> clusterMap : metadata.entrySet()) {
       for (Map.Entry<String, List<PartitionInfo>> topicPartition : clusterMap.getValue().entrySet()) {
@@ -59,7 +60,8 @@ public class OneToManyPartitioner extends AbstractKafkaPartitioner
           if (index >= eachPartitionAssignment.size()) {
             eachPartitionAssignment.add(new HashSet<PartitionMeta>());
           }
-          eachPartitionAssignment.get(index).add(new PartitionMeta(clusterMap.getKey(), topicPartition.getKey(), pif.partition()));
+          eachPartitionAssignment.get(index).add(new PartitionMeta(clusterMap.getKey(),
+              topicPartition.getKey(), pif.partition()));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
index 570bdea..05faab6 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
@@ -50,7 +50,8 @@ public class OneToOnePartitioner extends AbstractKafkaPartitioner
     for (Map.Entry<String, Map<String, List<PartitionInfo>>> clusterMap : metadata.entrySet()) {
       for (Map.Entry<String, List<PartitionInfo>> topicPartition : clusterMap.getValue().entrySet()) {
         for (PartitionInfo pif : topicPartition.getValue()) {
-          currentAssignment.add(Sets.newHashSet(new PartitionMeta(clusterMap.getKey(), topicPartition.getKey(), pif.partition())));
+          currentAssignment.add(Sets.newHashSet(new PartitionMeta(clusterMap.getKey(),
+              topicPartition.getKey(), pif.partition())));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
index 7c142c5..feafa3b 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
@@ -31,7 +31,8 @@ public enum PartitionStrategy
    */
   ONE_TO_ONE,
   /**
-   * Each operator consumes from several kafka partitions with overall input rate under some certain hard limit in msgs/s or bytes/s
+   * Each operator consumes from several kafka partitions with overall input rate under
+   * some certain hard limit in msgs/s or bytes/s
    * For now it <b>only</b> support <b>simple kafka consumer</b>
    */
   ONE_TO_MANY,

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
index 5ddcb18..e9fcc36 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
@@ -118,7 +118,7 @@ public class EmbeddedKafka
   {
     Properties producerProps = new Properties();
     producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
-    producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
+    producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
     producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
 
     try (KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps)) {
@@ -144,9 +144,10 @@ public class EmbeddedKafka
     consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
     consumerProps.setProperty("group.id", "group0");
     consumerProps.setProperty("client.id", "consumer0");
-    consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");
+    consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
     consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-    consumerProps.put("auto.offset.reset", earliest ? "earliest" : "latest");  // to make sure the consumer starts from the beginning of the topic
+    // to make sure the consumer starts from the beginning of the topic
+    consumerProps.put("auto.offset.reset", earliest ? "earliest" : "latest");
     KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
     consumer.subscribe(Arrays.asList(topic));
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
index 83e0de6..81d7ade 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
@@ -41,6 +41,7 @@ public class KafkaConsumerPropertiesTest
   public class Watcher extends TestWatcher
   {
     Context.OperatorContext context;
+
     @Override
     protected void starting(Description description)
     {
@@ -50,8 +51,8 @@ public class KafkaConsumerPropertiesTest
       kafkaInput.setTopics("apexTest");
       kafkaInput.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
       Properties prop = new Properties();
-      prop.setProperty("security.protocol","SASL_PLAINTEXT");
-      prop.setProperty("sasl.kerberos.service.name","kafka");
+      prop.setProperty("security.protocol", "SASL_PLAINTEXT");
+      prop.setProperty("sasl.kerberos.service.name", "kafka");
       kafkaInput.setConsumerProps(prop);
     }
 
@@ -71,8 +72,9 @@ public class KafkaConsumerPropertiesTest
         kafkaInput.definePartitions(null, null);
       } catch (KafkaException e) {
         //Ensures the  properties of the consumer are set/not reset.
-        Assert.assertEquals("java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in " +
-          "secure mode.", e.getCause().getMessage());
+        Assert.assertEquals(
+            "java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in "
+            + "secure mode.", e.getCause().getMessage());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
index c550032..abf3a5b 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
@@ -25,7 +25,8 @@ import java.util.Map;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 
-public class KafkaHelper implements Serializer<KafkaOutputOperatorTest.Person>, Deserializer<KafkaOutputOperatorTest.Person>
+public class KafkaHelper implements Serializer<KafkaOutputOperatorTest.Person>,
+    Deserializer<KafkaOutputOperatorTest.Person>
 {
   @Override
   public KafkaOutputOperatorTest.Person deserialize(String s, byte[] bytes)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
index 4e97d72..f16c8f4 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -21,14 +21,10 @@ package org.apache.apex.malhar.kafka;
 import java.io.File;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -54,7 +50,8 @@ import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.stram.StramLocalCluster;
 
 /**
- * A bunch of test to verify the input operator will be automatically partitioned per kafka partition This test is launching its
+ * A bunch of test to verify the input operator will be automatically partitioned
+ * per kafka partition This test is launching its
  * own Kafka cluster.
  */
 @RunWith(Parameterized.class)
@@ -90,7 +87,6 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
   @Rule
   public final KafkaTestInfo testInfo = new KafkaTestInfo();
 
-
   @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}")
   public static Collection<Object[]> testScenario()
   {
@@ -106,8 +102,6 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
     });
   }
 
-
-
   @Before
   public void before()
   {
@@ -196,7 +190,6 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
       endTuples = 0;
     }
 
-
     public void processTuple(byte[] bt)
     {
       String tuple = new String(bt);
@@ -220,7 +213,8 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
         String key = operatorId + "," + currentWindowId;
         List<String> msgsInWin = tupleCollectedInWindow.get(key);
         if (msgsInWin != null) {
-          Assert.assertEquals("replay messages should be exactly same as previous window", msgsInWin, windowTupleCollector);
+          Assert.assertEquals(
+              "replay messages should be exactly same as previous window", msgsInWin, windowTupleCollector);
         } else {
           List<String> newList = Lists.newArrayList();
           newList.addAll(windowTupleCollector);
@@ -235,10 +229,11 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
       int countDownTupleSize = countDownAll ? tupleSize : endTuples;
 
       if (latch != null) {
-        Assert.assertTrue("received END_TUPLES more than expected.", latch.getCount() >= countDownTupleSize);
+        Assert.assertTrue(
+            "received END_TUPLES more than expected.", latch.getCount() >= countDownTupleSize);
         while (countDownTupleSize > 0) {
-            latch.countDown();
-            --countDownTupleSize;
+          latch.countDown();
+          --countDownTupleSize;
         }
         if (latch.getCount() == 0) {
           /**
@@ -265,7 +260,6 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
 
   }
 
-
   /**
    * Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for Kafka, aka consumer). This module receives
    * data from an outside test generator through Kafka message bus and feed that data into Malhar streaming platform.
@@ -283,7 +277,6 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
     testInputOperator(false, false);
   }
 
-
   @Test
   public void testInputOperatorWithFailure() throws Exception
   {
@@ -303,7 +296,9 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
     // each broker should get a END_TUPLE message
     latch = new CountDownLatch(countDownAll ? totalCount + totalBrokers : totalBrokers);
 
-    logger.info("Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {}; hasMultiPartition: {}, partition: {}",
+    logger.info(
+        "Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {};" +
+        " hasMultiPartition: {}, partition: {}",
         testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition);
 
     // Start producer
@@ -319,7 +314,8 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
     DAG dag = lma.getDAG();
 
     // Create KafkaSinglePortStringInputOperator
-    KafkaSinglePortInputOperator node = dag.addOperator("Kafka input" + testName, KafkaSinglePortInputOperator.class);
+    KafkaSinglePortInputOperator node = dag.addOperator(
+        "Kafka input" + testName, KafkaSinglePortInputOperator.class);
     node.setInitialPartitionCount(1);
     // set topic
     node.setTopics(testName);
@@ -330,13 +326,13 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
       node.setWindowDataManager(new FSWindowDataManager());
     }
 
-
     // Create Test tuple collector
     CollectorModule collector = dag.addOperator("TestMessageCollector", CollectorModule.class);
     collector.isIdempotentTest = idempotent;
 
     // Connect ports
-    dag.addStream("Kafka message" + testName, node.outputPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);
+    dag.addStream("Kafka message" + testName, node.outputPort, collector.inputPort)
+        .setLocality(Locality.CONTAINER_LOCAL);
 
     if (hasFailure) {
       setupHasFailureTest(node, dag);
@@ -347,7 +343,8 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
     lc.setHeartbeatMonitoringEnabled(false);
 
     //let the Controller to run the inside another thread. It is almost same as call Controller.runAsync(),
-    //but Controller.runAsync() don't expose the thread which run it, so we don't know when the thread will be terminated.
+    //but Controller.runAsync() don't expose the thread which run it,
+    //so we don't know when the thread will be terminated.
     //create this thread and then call join() to make sure the Controller shutdown completely.
     monitorThread = new Thread((StramLocalCluster)lc, "master");
     monitorThread.start();
@@ -370,21 +367,23 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
       logger.info("Number of received/expected tuples: {}/{}, testName: {}, tuples: \n{}", tupleCollection.size(),
           expectedReceiveCount, testName, tupleCollection);
     }
-    Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected data: " + tupleCollection, notTimeout);
+    Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected data: "
+        + tupleCollection, notTimeout);
 
     // Check results
-    Assert.assertTrue( "testName: " + testName + "; Collected tuple size: " + tupleCollection.size() + "; Expected tuple size: " + expectedReceiveCount + "; data: \n" + tupleCollection,
+    Assert.assertTrue("testName: " + testName + "; Collected tuple size: " + tupleCollection.size()
+        + "; Expected tuple size: " + expectedReceiveCount + "; data: \n" + tupleCollection,
         expectedReceiveCount == tupleCollection.size());
 
     logger.info("End of test case: {}", testName);
   }
 
-
   private void setupHasFailureTest(KafkaSinglePortInputOperator operator, DAG dag)
   {
     operator.setHoldingBufferSize(5000);
     dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
-    //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent(APPLICATION_PATH + "failureck", new Configuration()));
+    //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent(
+    //  APPLICATION_PATH + "failureck", new Configuration()));
     operator.setMaxTuplesPerWindow(tuplesPerWindow);
   }
 
@@ -394,8 +393,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
     return l + TEST_KAFKA_BROKER_PORT[0][0] +
       (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") +
       (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") +
-      (hasMultiCluster && hasMultiPartition ? "," + l  + TEST_KAFKA_BROKER_PORT[1][1] : "");
+      (hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : "");
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
index a05fd9b..3910546 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
@@ -72,9 +72,9 @@ public class KafkaOperatorTestBase
     }
 
     TEST_ZOOKEEPER_PORT = new int[]{p[0], p[1]};
-    TEST_KAFKA_BROKER_PORT = new int[][] {
-      new int[] {p[2], p[3]},
-      new int[] {p[4], p[5]}
+    TEST_KAFKA_BROKER_PORT = new int[][]{
+      new int[]{p[2], p[3]},
+      new int[]{p[4], p[5]}
     };
   }
 
@@ -93,8 +93,10 @@ public class KafkaOperatorTestBase
 
   private static final String zkBaseDir = "zookeeper-server-data";
   private static final String kafkaBaseDir = "kafka-server-data";
-  private static final String[] zkdir = new String[] { "zookeeper-server-data/1", "zookeeper-server-data/2" };
-  private static final String[][] kafkadir = new String[][] { new String[] { "kafka-server-data/1/1", "kafka-server-data/1/2" }, new String[] { "kafka-server-data/2/1", "kafka-server-data/2/2" } };
+  private static final String[] zkdir = new String[]{"zookeeper-server-data/1", "zookeeper-server-data/2"};
+  private static final String[][] kafkadir = new String[][]{
+      new String[]{"kafka-server-data/1/1", "kafka-server-data/1/2"},
+      new String[]{"kafka-server-data/2/1", "kafka-server-data/2/2"}};
   protected boolean hasMultiPartition = false;
   protected boolean hasMultiCluster = false;
 
@@ -132,8 +134,8 @@ public class KafkaOperatorTestBase
         zkf.shutdown();
       }
     }
-    zkServer =  new ZooKeeperServer[2];
-    zkFactory =  new ServerCnxnFactory[2];
+    zkServer = new ZooKeeperServer[2];
+    zkFactory = new ServerCnxnFactory[2];
   }
 
   public static void startKafkaServer(int clusterid, int brokerid)
@@ -156,7 +158,8 @@ public class KafkaOperatorTestBase
   {
 
     FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir));
-    //boolean[][] startable = new boolean[][] { new boolean[] { true, hasMultiPartition }, new boolean[] { hasMultiCluster, hasMultiCluster && hasMultiPartition } };
+    //boolean[][] startable = new boolean[][] { new boolean[] { true, hasMultiPartition },
+    //  new boolean[] { hasMultiCluster, hasMultiCluster && hasMultiPartition } };
     startKafkaServer(0, 0);
     startKafkaServer(0, 1);
     startKafkaServer(1, 0);
@@ -261,13 +264,15 @@ public class KafkaOperatorTestBase
       // TODO Auto-generated constructor stub
     }
 
-    public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder) throws IOException
+    public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder)
+        throws IOException
     {
       super(txnLogFactory, tickTime, treeBuilder);
       // TODO Auto-generated constructor stub
     }
 
-    public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb)
+    public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout,
+        int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb)
     {
       super(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, treeBuilder, zkDb);
       // TODO Auto-generated constructor stub

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
index 58d69f6..7abf0f8 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
@@ -31,7 +31,6 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -145,7 +144,8 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
     Assert.assertTrue("No failure", fromKafka.size() > toKafka.size());
   }
 
-  private void sendDataToKafka(boolean exactlyOnce, List<Person> toKafka, boolean hasFailure, boolean differentTuplesAfterRecovery) throws InterruptedException
+  private void sendDataToKafka(boolean exactlyOnce, List<Person> toKafka, boolean hasFailure,
+      boolean differentTuplesAfterRecovery) throws InterruptedException
   {
     Properties props = new Properties();
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER);
@@ -159,7 +159,8 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
     attributeMap.put(Context.DAGContext.APPLICATION_NAME, "MyKafkaApp");
     attributeMap.put(DAG.APPLICATION_PATH, APPLICATION_PATH);
 
-    OperatorContextTestHelper.TestIdOperatorContext operatorContext = new OperatorContextTestHelper.TestIdOperatorContext(2, attributeMap);
+    OperatorContextTestHelper.TestIdOperatorContext operatorContext =
+        new OperatorContextTestHelper.TestIdOperatorContext(2, attributeMap);
 
     cleanUp(operatorContext);
 
@@ -167,11 +168,13 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
     DefaultInputPort<Person> inputPort;
 
     if (exactlyOnce) {
-      KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp = ResetKafkaOutput(testName, props, operatorContext);
+      KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp =
+          ResetKafkaOutput(testName, props, operatorContext);
       inputPort = kafkaOutputTemp.inputPort;
       kafkaOutput = kafkaOutputTemp;
     } else {
-      KafkaSinglePortOutputOperator<String, Person> kafkaOutputTemp = ResetKafkaSimpleOutput(testName, props, operatorContext);
+      KafkaSinglePortOutputOperator<String, Person> kafkaOutputTemp =
+          ResetKafkaSimpleOutput(testName, props, operatorContext);
       inputPort = kafkaOutputTemp.inputPort;
       kafkaOutput = kafkaOutputTemp;
     }
@@ -192,11 +195,13 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
 
     if (hasFailure) {
       if (exactlyOnce) {
-        KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp = ResetKafkaOutput(testName, props, operatorContext);
+        KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp =
+            ResetKafkaOutput(testName, props, operatorContext);
         inputPort = kafkaOutputTemp.inputPort;
         kafkaOutput = kafkaOutputTemp;
       } else {
-        KafkaSinglePortOutputOperator<String,Person> kafkaOutputTemp = ResetKafkaSimpleOutput(testName, props, operatorContext);
+        KafkaSinglePortOutputOperator<String, Person> kafkaOutputTemp =
+            ResetKafkaSimpleOutput(testName, props, operatorContext);
         inputPort = kafkaOutputTemp.inputPort;
         kafkaOutput = kafkaOutputTemp;
       }
@@ -225,7 +230,8 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
     cleanUp(operatorContext);
   }
 
-  private KafkaSinglePortExactlyOnceOutputOperator<Person> ResetKafkaOutput(String testName, Properties props, Context.OperatorContext operatorContext)
+  private KafkaSinglePortExactlyOnceOutputOperator<Person> ResetKafkaOutput(
+      String testName, Properties props, Context.OperatorContext operatorContext)
   {
     KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutput = new KafkaSinglePortExactlyOnceOutputOperator<>();
     kafkaOutput.setTopic(testName);
@@ -235,9 +241,10 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
     return kafkaOutput;
   }
 
-  private KafkaSinglePortOutputOperator<String, Person> ResetKafkaSimpleOutput(String testName, Properties props, Context.OperatorContext operatorContext)
+  private KafkaSinglePortOutputOperator<String, Person> ResetKafkaSimpleOutput(
+      String testName, Properties props, Context.OperatorContext operatorContext)
   {
-    KafkaSinglePortOutputOperator<String,Person> kafkaOutput = new KafkaSinglePortOutputOperator<>();
+    KafkaSinglePortOutputOperator<String, Person> kafkaOutput = new KafkaSinglePortOutputOperator<>();
     kafkaOutput.setTopic(testName);
     kafkaOutput.setProperties(props);
     kafkaOutput.setup(operatorContext);
@@ -263,7 +270,7 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
     }
 
     for (int i = 0; i < fromKafka.size(); ++i) {
-      if ( !fromKafka.get(i).equals(toKafka.get(i))) {
+      if (!fromKafka.get(i).equals(toKafka.get(i))) {
         return false;
       }
     }
@@ -275,9 +282,9 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
   {
     String l = "localhost:";
     return l + TEST_KAFKA_BROKER_PORT[0][0] +
-        (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") +
-        (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") +
-        (hasMultiCluster && hasMultiPartition ? "," + l  + TEST_KAFKA_BROKER_PORT[1][1] : "");
+      (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") +
+      (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") +
+      (hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : "");
   }
 
   private List<Person> GenerateList()

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
index 21f8977..6098bde 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
@@ -33,11 +33,13 @@ import kafka.utils.VerifiableProperties;
  */
 public class KafkaTestPartitioner implements Partitioner
 {
-  public KafkaTestPartitioner(VerifiableProperties props) {
+  public KafkaTestPartitioner(VerifiableProperties props)
+  {
 
   }
 
-  public KafkaTestPartitioner() {
+  public KafkaTestPartitioner()
+  {
 
   }
 
@@ -45,7 +47,7 @@ public class KafkaTestPartitioner implements Partitioner
   public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster)
   {
     int num_partitions = cluster.partitionsForTopic(topic).size();
-    return Integer.parseInt((String)key)%num_partitions;
+    return Integer.parseInt((String)key) % num_partitions;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
index ca6cc98..0f18666 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
@@ -62,7 +62,8 @@ public class KafkaTestProducer implements Runnable
     this.sendCount = sendCount;
   }
 
-  public void setMessages(List<String> messages) {
+  public void setMessages(List<String> messages)
+  {
     this.messages = messages;
   }
 
@@ -72,8 +73,8 @@ public class KafkaTestProducer implements Runnable
     props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
     props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
     props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaTestPartitioner.class.getName());
-    String brokerList = "localhost:"+KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][0];
-    brokerList += hasPartition ? (",localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][1]):"";
+    String brokerList = "localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][0];
+    brokerList += hasPartition ? (",localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][1]) : "";
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
     props.setProperty(ProducerConfig.METADATA_MAX_AGE_CONFIG, "20000");
     props.setProperty(ProducerConfig.ACKS_CONFIG, getAckType());
@@ -94,14 +95,15 @@ public class KafkaTestProducer implements Runnable
     this.hasPartition = hasPartition;
     this.hasMultiCluster = hasMultiCluster;
     producer = new KafkaProducer<>(createProducerConfig(0));
-    if(hasMultiCluster){
+    if (hasMultiCluster) {
       producer1 = new KafkaProducer<>(createProducerConfig(1));
     } else {
       producer1 = null;
     }
   }
 
-  public KafkaTestProducer(String topic, boolean hasPartition) {
+  public KafkaTestProducer(String topic, boolean hasPartition)
+  {
     this(topic, hasPartition, false);
   }
 
@@ -115,7 +117,7 @@ public class KafkaTestProducer implements Runnable
       String messageStr = "_" + messageNo++;
       int k = rand.nextInt(100);
       sendTasks.add(producer.send(new ProducerRecord<>(topic, "" + k, "c1" + messageStr)));
-      if(hasMultiCluster && messageNo <= sendCount){
+      if (hasMultiCluster && messageNo <= sendCount) {
         messageStr = "_" + messageNo++;
         sendTasks.add(producer1.send(new ProducerRecord<>(topic, "" + k, "c2" + messageStr)));
       }
@@ -142,7 +144,7 @@ public class KafkaTestProducer implements Runnable
     }
 
     producer.flush();
-    if (producer1!=null) {
+    if (producer1 != null) {
       producer1.flush();
     }
 
@@ -160,7 +162,7 @@ public class KafkaTestProducer implements Runnable
   public void close()
   {
     producer.close();
-    if (producer1!=null) {
+    if (producer1 != null) {
       producer1.close();
     }
   }
@@ -174,4 +176,4 @@ public class KafkaTestProducer implements Runnable
   {
     this.ackType = ackType;
   }
-} // End of KafkaTestProducer
\ No newline at end of file
+} // End of KafkaTestProducer