You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/05/18 20:42:05 UTC

[15/22] incubator-apex-malhar git commit: APEXMALHAR-2095 removed checkstyle violations of malhar library module

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
index 4172ed4..945c000 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
@@ -32,14 +32,13 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.Operator.CheckpointListener;
 import com.datatorrent.api.Operator.IdleTimeHandler;
-
-import com.datatorrent.netlet.util.DTThrowable;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.common.util.NameableThreadFactory;
+import com.datatorrent.netlet.util.DTThrowable;
 
 /**
  * This base operator queues input tuples for each window and asynchronously processes them after the window is committed.
@@ -115,12 +114,10 @@ public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator
     if (execute) {
       try {
         Thread.sleep(spinningTime);
-      }
-      catch (InterruptedException ie) {
+      } catch (InterruptedException ie) {
         throw new RuntimeException(ie);
       }
-    }
-    else {
+    } else {
       logger.error("Exception: ", cause);
       DTThrowable.rethrow(cause.get());
     }
@@ -178,14 +175,14 @@ public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator
             processCommittedData(output);
             doneTuples.add(output);
           }
-        }
-        catch (Throwable e) {
+        } catch (Throwable e) {
           cause.set(e);
           execute = false;
         }
       }
     };
   }
+
   /**
    * The implementation class should call this method to enqueue output once input is converted to queue input.
    *
@@ -203,7 +200,7 @@ public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator
    *
    * @param input
    */
-  abstract protected void processTuple(INPUT input);
+  protected abstract void processTuple(INPUT input);
 
   /**
    * This method is called once the window in which queueTuple was created is committed.

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputFileInputOperator.java
index a9604e3..0a96418 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputFileInputOperator.java
@@ -18,15 +18,16 @@
  */
 package com.datatorrent.lib.io.fs;
 
-import com.datatorrent.api.Stats.OperatorStats;
-import com.datatorrent.lib.counters.BasicCounters;
-
 import java.util.Collection;
 
-import org.apache.commons.lang.mutable.MutableLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang.mutable.MutableLong;
+
+import com.datatorrent.api.Stats.OperatorStats;
+import com.datatorrent.lib.counters.BasicCounters;
+
 /**
  * This is the base implementation for a file input operator, which scans a directory for files.&nbsp;
  * Files are then read and split into tuples, which are emitted.&nbsp;
@@ -134,7 +135,7 @@ public abstract class AbstractThroughputFileInputOperator<T> extends AbstractFil
     int newOperatorCount;
     int totalFileCount = 0;
 
-    for(Partition<AbstractFileInputOperator<T>> partition : partitions) {
+    for (Partition<AbstractFileInputOperator<T>> partition : partitions) {
       AbstractFileInputOperator<T> oper = partition.getPartitionedInstance();
       totalFileCount += oper.failedFiles.size();
       totalFileCount += oper.pendingFiles.size();
@@ -145,11 +146,10 @@ public abstract class AbstractThroughputFileInputOperator<T> extends AbstractFil
       }
     }
 
-    if(!isInitialParitition) {
+    if (!isInitialParitition) {
       LOG.debug("definePartitions: Total File Count: {}", totalFileCount);
       newOperatorCount = computeOperatorCount(totalFileCount);
-    }
-    else {
+    } else {
       newOperatorCount = partitionCount;
     }
 
@@ -160,13 +160,13 @@ public abstract class AbstractThroughputFileInputOperator<T> extends AbstractFil
   {
     int newOperatorCount = totalFileCount / preferredMaxPendingFilesPerOperator;
 
-    if(totalFileCount % preferredMaxPendingFilesPerOperator > 0) {
+    if (totalFileCount % preferredMaxPendingFilesPerOperator > 0) {
       newOperatorCount++;
     }
-    if(newOperatorCount > partitionCount) {
+    if (newOperatorCount > partitionCount) {
       newOperatorCount = partitionCount;
     }
-    if(newOperatorCount == 0) {
+    if (newOperatorCount == 0) {
       newOperatorCount = 1;
     }
 
@@ -179,17 +179,17 @@ public abstract class AbstractThroughputFileInputOperator<T> extends AbstractFil
   {
     BasicCounters<MutableLong> fileCounters = null;
 
-    for(OperatorStats operatorStats: batchedOperatorStats.getLastWindowedStats()) {
-      if(operatorStats.counters != null) {
-        fileCounters = (BasicCounters<MutableLong>) operatorStats.counters;
+    for (OperatorStats operatorStats : batchedOperatorStats.getLastWindowedStats()) {
+      if (operatorStats.counters != null) {
+        fileCounters = (BasicCounters<MutableLong>)operatorStats.counters;
       }
     }
 
     Response response = new Response();
 
-    if(fileCounters != null &&
-       fileCounters.getCounter(FileCounters.PENDING_FILES).longValue() > 0L ||
-       System.currentTimeMillis() - repartitionInterval <= lastRepartition) {
+    if (fileCounters != null &&
+        fileCounters.getCounter(FileCounters.PENDING_FILES).longValue() > 0L ||
+        System.currentTimeMillis() - repartitionInterval <= lastRepartition) {
       response.repartitionRequired = false;
       return response;
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
index 48d4ae6..69e44a5 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
@@ -21,7 +21,11 @@ package com.datatorrent.lib.io.fs;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
-import java.util.*;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -33,13 +37,14 @@ import javax.annotation.Nullable;
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -49,13 +54,16 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import com.datatorrent.api.*;
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
 import com.datatorrent.api.annotation.OperatorAnnotation;
-
-import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.lib.counters.BasicCounters;
 import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
+import com.datatorrent.netlet.util.DTThrowable;
 
 /**
  * Input operator that scans a directory for files and splits a file into blocks.<br/>
@@ -129,8 +137,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
 
     try {
       fs = scanner.getFSInstance();
-    }
-    catch (IOException e) {
+    } catch (IOException e) {
       throw new RuntimeException("creating fs", e);
     }
 
@@ -138,10 +145,10 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
       blockSize = fs.getDefaultBlockSize(new Path(scanner.files.iterator().next()));
     }
 
-    if (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) < idempotentStorageManager.getLargestRecoveryWindow()) {
+    if (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) <
+        idempotentStorageManager.getLargestRecoveryWindow()) {
       blockMetadataIterator = null;
-    }
-    else {
+    } else {
       //don't setup scanner while recovery
       scanner.setup(context);
     }
@@ -153,15 +160,12 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
   {
     try {
       scanner.teardown();
-    }
-    catch (Throwable t) {
+    } catch (Throwable t) {
       DTThrowable.rethrow(t);
-    }
-    finally {
+    } finally {
       try {
         fs.close();
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         throw new RuntimeException(e);
       }
     }
@@ -181,8 +185,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
   {
     try {
       @SuppressWarnings("unchecked")
-      LinkedList<FileInfo> recoveredData = (LinkedList<FileInfo>) idempotentStorageManager.load(operatorId,
-        windowId);
+      LinkedList<FileInfo> recoveredData = (LinkedList<FileInfo>)idempotentStorageManager.load(operatorId, windowId);
       if (recoveredData == null) {
         //This could happen when there are multiple physical instances and one of them is ahead in processing windows.
         return;
@@ -193,8 +196,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
       for (FileInfo info : recoveredData) {
         if (info.directoryPath != null) {
           scanner.lastModifiedTimes.put(info.directoryPath, info.modifiedTime);
-        }
-        else { //no directory
+        } else { //no directory
           scanner.lastModifiedTimes.put(info.relativeFilePath, info.modifiedTime);
         }
 
@@ -211,8 +213,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
       if (windowId == idempotentStorageManager.getLargestRecoveryWindow()) {
         scanner.setup(context);
       }
-    }
-    catch (IOException e) {
+    } catch (IOException e) {
       throw new RuntimeException("replay", e);
     }
   }
@@ -250,8 +251,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
         if (fileInfo.lastFileOfScan) {
           break;
         }
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         throw new RuntimeException("creating metadata", e);
       }
     }
@@ -263,8 +263,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
     if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) {
       try {
         idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId);
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         throw new RuntimeException("saving recovery", e);
       }
     }
@@ -280,8 +279,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
     while (blockMetadataIterator.hasNext()) {
       if (blockCount++ < blocksThreshold) {
         this.blocksMetadataOutput.emit(blockMetadataIterator.next());
-      }
-      else {
+      } else {
         return false;
       }
     }
@@ -293,7 +291,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
    * Can be overridden for creating block metadata of a type that extends {@link FileBlockMetadata}
    */
   protected FileBlockMetadata createBlockMetadata(long pos, long lengthOfFileInBlock, int blockNumber,
-                                                  FileMetadata fileMetadata, boolean isLast)
+      FileMetadata fileMetadata, boolean isLast)
   {
     return new FileBlockMetadata(fileMetadata.getFilePath(), fileMetadata.getBlockIds()[blockNumber - 1], pos,
       lengthOfFileInBlock, isLast, blockNumber == 1 ? -1 : fileMetadata.getBlockIds()[blockNumber - 2]);
@@ -321,7 +319,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
     fileMetadata.setFileLength(status.getLen());
 
     if (!status.isDirectory()) {
-      int noOfBlocks = (int) ((status.getLen() / blockSize) + (((status.getLen() % blockSize) == 0) ? 0 : 1));
+      int noOfBlocks = (int)((status.getLen() / blockSize) + (((status.getLen() % blockSize) == 0) ? 0 : 1));
       if (fileMetadata.getDataOffset() >= status.getLen()) {
         noOfBlocks = 0;
       }
@@ -335,7 +333,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
   {
     // block ids are 32 bits of operatorId | 32 bits of sequence number
     long[] blockIds = new long[fileMetadata.getNumberOfBlocks()];
-    long longLeftSide = ((long) operatorId) << 32;
+    long longLeftSide = ((long)operatorId) << 32;
     for (int i = 0; i < fileMetadata.getNumberOfBlocks(); i++) {
       blockIds[i] = longLeftSide | sequenceNo++ & 0xFFFFFFFFL;
     }
@@ -392,8 +390,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
   {
     try {
       idempotentStorageManager.deleteUpTo(operatorId, l);
-    }
-    catch (IOException e) {
+    } catch (IOException e) {
       throw new RuntimeException(e);
     }
   }
@@ -671,8 +668,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
       }
       try {
         fs = getFSInstance();
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         throw new RuntimeException("opening fs", e);
       }
       scanService.submit(this);
@@ -685,8 +681,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
       scanService.shutdownNow();
       try {
         fs.close();
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         throw new RuntimeException("closing fs", e);
       }
     }
@@ -708,13 +703,11 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
               scan(new Path(afile), null);
             }
             scanComplete();
-          }
-          else {
+          } else {
             Thread.sleep(sleepMillis);
           }
         }
-      }
-      catch (Throwable throwable) {
+      } catch (Throwable throwable) {
         LOG.error("service", throwable);
         running = false;
         atomicThrowable.set(throwable);
@@ -776,29 +769,25 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
             LOG.debug("found {}", childPathStr);
 
             FileInfo info;
-            if(rootPath == null) {
-             info =parentStatus.isDirectory() ?
-                new FileInfo(parentPathStr, childPath.getName(), parentStatus.getModificationTime()) :
-                new FileInfo(null, childPathStr, parentStatus.getModificationTime());
-            }
-            else {
+            if (rootPath == null) {
+              info = parentStatus.isDirectory() ?
+                  new FileInfo(parentPathStr, childPath.getName(), parentStatus.getModificationTime()) :
+                  new FileInfo(null, childPathStr, parentStatus.getModificationTime());
+            } else {
               URI relativeChildURI = rootPath.toUri().relativize(childPath.toUri());
               info = new FileInfo(rootPath.toUri().getPath(), relativeChildURI.getPath(),
-                parentStatus.getModificationTime());
+                  parentStatus.getModificationTime());
             }
 
             discoveredFiles.add(info);
-          }
-          else {
+          } else {
             // don't look at it again
             ignoredFiles.add(childPathStr);
           }
         }
-      }
-      catch (FileNotFoundException fnf) {
+      } catch (FileNotFoundException fnf) {
         LOG.warn("Failed to list directory {}", filePath, fnf);
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         throw new RuntimeException("listing files", e);
       }
     }
@@ -813,7 +802,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
      * @throws IOException
      */
     protected boolean skipFile(@SuppressWarnings("unused") @NotNull Path path, @NotNull Long modificationTime,
-                               Long lastModificationTime) throws IOException
+        Long lastModificationTime) throws IOException
     {
       return (!(lastModificationTime == null || modificationTime > lastModificationTime));
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamContext.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamContext.java b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamContext.java
index 595abde..35530a3 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamContext.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamContext.java
@@ -44,7 +44,7 @@ public interface FilterStreamContext<F extends FilterOutputStream>
    * Base filter context that can be extended to build custom filters.
    * @param <F> The Filter output stream
    */
-  public static abstract class BaseFilterStreamContext<F extends FilterOutputStream> implements FilterStreamContext<F>
+  abstract class BaseFilterStreamContext<F extends FilterOutputStream> implements FilterStreamContext<F>
   {
     protected transient F filterStream;
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamProvider.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamProvider.java b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamProvider.java
index 58b51af..75e6e5f 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamProvider.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamProvider.java
@@ -21,7 +21,12 @@ package com.datatorrent.lib.io.fs;
 import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 import com.google.common.collect.Maps;
 
@@ -37,7 +42,7 @@ public interface FilterStreamProvider<F extends FilterOutputStream, S extends Ou
   
   public void reclaimFilterStreamContext(FilterStreamContext<F> filterStreamContext);
   
-  public static abstract class SimpleFilterReusableStreamProvider<F extends FilterOutputStream, S extends OutputStream> implements FilterStreamProvider<F, S>
+  abstract class SimpleFilterReusableStreamProvider<F extends FilterOutputStream, S extends OutputStream> implements FilterStreamProvider<F, S>
   {
 
     private transient Map<OutputStream, FilterStreamContext<F>> reusableContexts = Maps.newHashMap();
@@ -112,7 +117,9 @@ public interface FilterStreamProvider<F extends FilterOutputStream, S extends Ou
       }
     }
 
-    private class FilterChainStreamContext extends FilterStreamContext.BaseFilterStreamContext implements FilterStreamContext {
+    private class FilterChainStreamContext extends FilterStreamContext.BaseFilterStreamContext
+        implements FilterStreamContext
+    {
       
       private List<FilterStreamContext<?>> streamContexts = new ArrayList<FilterStreamContext<?>>();
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/fs/TailFsInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/TailFsInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/TailFsInputOperator.java
index 4ac03a6..f2e9a8c 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/TailFsInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/TailFsInputOperator.java
@@ -22,10 +22,11 @@ import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 
-import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.io.FileUtils;
+
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
@@ -261,6 +262,7 @@ public class TailFsInputOperator implements InputOperator, ActivationListener<Op
         try {
           Thread.sleep(delay);
         } catch (InterruptedException e) {
+          //swallowing exception
         }
         --localCounter;
       }
@@ -286,7 +288,7 @@ public class TailFsInputOperator implements InputOperator, ActivationListener<Op
     }
     accessTime = System.currentTimeMillis();
     while ((ch = reader.read()) != -1) {
-      readChar = (char) ch;
+      readChar = (char)ch;
       if (readChar != delimiter) {
         sb.append(readChar);
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/fs/package-info.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/package-info.java b/library/src/main/java/com/datatorrent/lib/io/fs/package-info.java
index 2009d61..872a618 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/package-info.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/package-info.java
@@ -20,4 +20,4 @@
  * Library of input operators for writing into file streams and output operators for reading from file streams.
  * The file I/O operators interact with entities outside of DAG, and at times outside of Hadoop
  */
-package com.datatorrent.lib.io.fs;
\ No newline at end of file
+package com.datatorrent.lib.io.fs;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSOutputOperator.java
index 617b397..bac0816 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSOutputOperator.java
@@ -18,18 +18,22 @@
  */
 package com.datatorrent.lib.io.jms;
 
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.Operator;
-import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.List;
+
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+
 /**
  * This is the base implementation of an JMS output operator.&nbsp;
  * A concrete operator should be created from this skeleton implementation.
@@ -93,8 +97,7 @@ public abstract class AbstractJMSOutputOperator extends JMSBase implements Opera
 
     try {
       createConnection();
-    }
-    catch (JMSException ex) {
+    } catch (JMSException ex) {
       logger.debug(ex.getLocalizedMessage());
       throw new RuntimeException(ex);
     }
@@ -103,8 +106,7 @@ public abstract class AbstractJMSOutputOperator extends JMSBase implements Opera
 
     try {
       store.connect();
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
 
@@ -112,7 +114,7 @@ public abstract class AbstractJMSOutputOperator extends JMSBase implements Opera
 
     mode = context.getValue(OperatorContext.PROCESSING_MODE);
 
-    if(mode==ProcessingMode.AT_MOST_ONCE){
+    if (mode == ProcessingMode.AT_MOST_ONCE) {
       //Batch must be cleared to avoid writing same data twice
       tupleBatch.clear();
     }
@@ -135,8 +137,7 @@ public abstract class AbstractJMSOutputOperator extends JMSBase implements Opera
     logger.debug("beginning teardown");
     try {
       store.disconnect();
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
 
@@ -161,7 +162,7 @@ public abstract class AbstractJMSOutputOperator extends JMSBase implements Opera
   {
     logger.debug("Ending window {}", currentWindowId);
 
-    if(store.isExactlyOnce()) {
+    if (store.isExactlyOnce()) {
       //Store committed window and data in same transaction
       if (committedWindowId < currentWindowId) {
         store.storeCommittedWindowId(appId, operatorId, currentWindowId);
@@ -170,8 +171,7 @@ public abstract class AbstractJMSOutputOperator extends JMSBase implements Opera
 
       flushBatch();
       store.commitTransaction();
-    }
-    else {
+    } else {
       //For transactionable stores which cannot support exactly once, At least
       //once can be insured by for storing the data and then the committed window
       //id.
@@ -194,11 +194,10 @@ public abstract class AbstractJMSOutputOperator extends JMSBase implements Opera
   {
     logger.debug("flushing batch, batch size {}", tupleBatch.size());
 
-    for (Message message: messageBatch) {
+    for (Message message : messageBatch) {
       try {
         producer.send(message);
-      }
-      catch (JMSException ex) {
+      } catch (JMSException ex) {
         throw new RuntimeException(ex);
       }
     }
@@ -215,7 +214,7 @@ public abstract class AbstractJMSOutputOperator extends JMSBase implements Opera
    */
   protected void sendMessage(Object data)
   {
-    if(currentWindowId <= committedWindowId) {
+    if (currentWindowId <= committedWindowId) {
       return;
     }
 
@@ -249,8 +248,7 @@ public abstract class AbstractJMSOutputOperator extends JMSBase implements Opera
       producer = null;
 
       super.cleanup();
-    }
-    catch (JMSException ex) {
+    } catch (JMSException ex) {
       logger.error(null, ex);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSSinglePortOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSSinglePortOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSSinglePortOutputOperator.java
index c7ffed3..efda6b0 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSSinglePortOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSSinglePortOutputOperator.java
@@ -18,10 +18,11 @@
  */
 package com.datatorrent.lib.io.jms;
 
-import com.datatorrent.api.DefaultInputPort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.DefaultInputPort;
+
 /**
  * This is the base implementation of a single port JMS output operator.&nbsp;
  * A concrete operator should be created from this skeleton implementation.

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/jms/FSPsuedoTransactionableStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/FSPsuedoTransactionableStore.java b/library/src/main/java/com/datatorrent/lib/io/jms/FSPsuedoTransactionableStore.java
index 61f1eb7..31eaf18 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/FSPsuedoTransactionableStore.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/FSPsuedoTransactionableStore.java
@@ -18,15 +18,22 @@
  */
 package com.datatorrent.lib.io.jms;
 
-import com.datatorrent.api.annotation.Stateless;
 import java.io.IOException;
+
 import javax.jms.JMSException;
 import javax.validation.constraints.NotNull;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.annotation.Stateless;
+
 /**
  * This is a JMS store which stores committed window ids in a file. This is not a true
  * transactionable store because there is a chance that a failure may occur in between storing the
@@ -85,9 +92,8 @@ public class FSPsuedoTransactionableStore extends JMSBaseTransactionableStore
   {
     FileSystem tempFS = FileSystem.newInstance(new Path(recoveryDirectory).toUri(), new Configuration());
 
-    if(tempFS instanceof LocalFileSystem)
-    {
-      tempFS = ((LocalFileSystem) tempFS).getRaw();
+    if (tempFS instanceof LocalFileSystem) {
+      tempFS = ((LocalFileSystem)tempFS).getRaw();
     }
 
     return tempFS;
@@ -118,12 +124,10 @@ public class FSPsuedoTransactionableStore extends JMSBaseTransactionableStore
 
     try {
       //No committed window stored, return negative invalid window.
-      if(!fs.exists(recoveryPath))
-      {
+      if (!fs.exists(recoveryPath)) {
         return Stateless.WINDOW_ID;
       }
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
 
@@ -132,16 +136,15 @@ public class FSPsuedoTransactionableStore extends JMSBaseTransactionableStore
     try {
       FileStatus[] windowFiles = fs.listStatus(recoveryPath);
 
-      for(FileStatus fileStatus: windowFiles) {
+      for (FileStatus fileStatus : windowFiles) {
         String windowString = fileStatus.getPath().getName();
         long tempWindow = Long.parseLong(windowString);
 
-        if(maxWindow < tempWindow) {
+        if (maxWindow < tempWindow) {
           maxWindow = tempWindow;
         }
       }
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
 
@@ -159,14 +162,13 @@ public class FSPsuedoTransactionableStore extends JMSBaseTransactionableStore
       fs.create(windowPath);
       FileStatus[] windowFiles = fs.listStatus(recoveryPath);
 
-      for(FileStatus fileStatus: windowFiles) {
+      for (FileStatus fileStatus : windowFiles) {
         Path tempPath = fileStatus.getPath();
-        if(!tempPath.getName().equals(windowString)) {
+        if (!tempPath.getName().equals(windowString)) {
           fs.delete(tempPath, true);
         }
       }
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
   }
@@ -175,10 +177,8 @@ public class FSPsuedoTransactionableStore extends JMSBaseTransactionableStore
   public void removeCommittedWindowId(String appId, int operatorId)
   {
     try {
-      fs.delete(getOperatorRecoveryPath(appId, operatorId).getParent(),
-                true);
-    }
-    catch (IOException ex) {
+      fs.delete(getOperatorRecoveryPath(appId, operatorId).getParent(), true);
+    } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
   }
@@ -194,8 +194,7 @@ public class FSPsuedoTransactionableStore extends JMSBaseTransactionableStore
   {
     try {
       this.getBase().getSession().commit();
-    }
-    catch (JMSException ex) {
+    } catch (JMSException ex) {
       throw new RuntimeException(ex);
     }
 
@@ -207,8 +206,7 @@ public class FSPsuedoTransactionableStore extends JMSBaseTransactionableStore
   {
     try {
       this.getBase().getSession().rollback();
-    }
-    catch (JMSException ex) {
+    } catch (JMSException ex) {
       throw new RuntimeException(ex);
     }
 
@@ -249,13 +247,9 @@ public class FSPsuedoTransactionableStore extends JMSBaseTransactionableStore
     return false;
   }
 
-  private Path getOperatorRecoveryPath(String appId,
-                                       int operatorId)
+  private Path getOperatorRecoveryPath(String appId, int operatorId)
   {
-    return new Path(DEFAULT_RECOVERY_DIRECTORY + "/" +
-                         appId + "/" +
-                         operatorId + "/" +
-                         COMMITTED_WINDOW_DIR);
+    return new Path(DEFAULT_RECOVERY_DIRECTORY + "/" + appId + "/" + operatorId + "/" + COMMITTED_WINDOW_DIR);
   }
 
   /**
@@ -265,14 +259,9 @@ public class FSPsuedoTransactionableStore extends JMSBaseTransactionableStore
    * @param windowId The id of the current window.
    * @return The path where the windowId is stored.
    */
-  private Path getOperatorWindowRecoveryPath(String appId,
-                                       int operatorId,
-                                       long windowId)
+  private Path getOperatorWindowRecoveryPath(String appId, int operatorId, long windowId)
   {
-    return new Path(DEFAULT_RECOVERY_DIRECTORY + "/" +
-                         appId + "/" +
-                         operatorId + "/" +
-                         COMMITTED_WINDOW_DIR + "/" +
-                         windowId);
+    return new Path(DEFAULT_RECOVERY_DIRECTORY + "/" + appId + "/" + operatorId + "/" + COMMITTED_WINDOW_DIR + "/" +
+        windowId);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java b/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java
index 6db6a4d..48ed2c3 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java
@@ -315,17 +315,13 @@ public class JMSBase
   {
     if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) {
       return Session.CLIENT_ACKNOWLEDGE;
-    }
-    else if ("AUTO_ACKNOWLEDGE".equals(ackMode)) {
+    } else if ("AUTO_ACKNOWLEDGE".equals(ackMode)) {
       return Session.AUTO_ACKNOWLEDGE;
-    }
-    else if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) {
+    } else if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) {
       return Session.DUPS_OK_ACKNOWLEDGE;
-    }
-    else if ("SESSION_TRANSACTED".equals(ackMode)) {
+    } else if ("SESSION_TRANSACTED".equals(ackMode)) {
       return Session.SESSION_TRANSACTED;
-    }
-    else {
+    } else {
       return Session.CLIENT_ACKNOWLEDGE; // default
     }
   }
@@ -372,8 +368,7 @@ public class JMSBase
       BeanUtils.populate(cf, connectionFactoryProperties);
       logger.debug("creation successful.");
       return cf;
-    }
-    catch (Exception e) {
+    } catch (Exception e) {
       throw new RuntimeException("Failed to create connection factory.", e);
     }
   }
@@ -388,8 +383,7 @@ public class JMSBase
       connection.close();
       session = null;
       connection = null;
-    }
-    catch (JMSException ex) {
+    } catch (JMSException ex) {
       logger.debug(ex.getLocalizedMessage());
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/jms/JMSMultiPortOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/JMSMultiPortOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/JMSMultiPortOutputOperator.java
index 9caa833..3bb8cb9 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/JMSMultiPortOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/JMSMultiPortOutputOperator.java
@@ -18,14 +18,20 @@
  */
 package com.datatorrent.lib.io.jms;
 
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 import java.io.Serializable;
 import java.util.Map;
-import javax.jms.*;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+
 /**
  * @since 2.1.0
  */
@@ -108,28 +114,22 @@ public class JMSMultiPortOutputOperator extends AbstractJMSOutputOperator
     try {
       if (tuple instanceof Message) {
         return (Message)tuple;
-      }
-      else if (tuple instanceof String) {
+      } else if (tuple instanceof String) {
         return getSession().createTextMessage((String)tuple);
-      }
-      else if (tuple instanceof byte[]) {
+      } else if (tuple instanceof byte[]) {
         BytesMessage message = getSession().createBytesMessage();
         message.writeBytes((byte[])tuple);
         return message;
-      }
-      else if (tuple instanceof Map) {
+      } else if (tuple instanceof Map) {
         return createMessageForMap((Map)tuple);
-      }
-      else if (tuple instanceof Serializable) {
+      } else if (tuple instanceof Serializable) {
         return getSession().createObjectMessage((Serializable)tuple);
-      }
-      else {
+      } else {
         throw new RuntimeException("Cannot convert object of type "
-                + tuple.getClass() + "] to JMS message. Supported message "
-                + "payloads are: String, byte array, Map<String,?>, Serializable object.");
+            + tuple.getClass() + "] to JMS message. Supported message "
+            + "payloads are: String, byte array, Map<String,?>, Serializable object.");
       }
-    }
-    catch (JMSException ex) {
+    } catch (JMSException ex) {
       logger.error(ex.getLocalizedMessage());
       throw new RuntimeException(ex);
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/jms/JMSObjectInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/JMSObjectInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/JMSObjectInputOperator.java
index aa68802..0bc0c79 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/JMSObjectInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/JMSObjectInputOperator.java
@@ -18,15 +18,24 @@
  */
 package com.datatorrent.lib.io.jms;
 
-import com.datatorrent.api.DefaultOutputPort;
 import java.io.Serializable;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
-import javax.jms.*;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.DefaultOutputPort;
+
 /**
  * An implementation of AbstractJMSInputOperator which emits TextMessage,StreamMessage,BytesMessage,MapMessage
  * and ObjectMessage on their respective ports.
@@ -54,20 +63,15 @@ public class JMSObjectInputOperator extends AbstractJMSInputOperator<Object>
   {
     if (message instanceof TextMessage) {
       return ((TextMessage)message).getText();
-    }
-    else if (message instanceof StreamMessage) {
+    } else if (message instanceof StreamMessage) {
       return ((StreamMessage)message).readString();
-    }
-    else if (message instanceof BytesMessage) {
+    } else if (message instanceof BytesMessage) {
       return extractByteArrayFromMessage((BytesMessage)message);
-    }
-    else if (message instanceof MapMessage) {
+    } else if (message instanceof MapMessage) {
       return extractMapFromMessage((MapMessage)message);
-    }
-    else if (message instanceof ObjectMessage) {
+    } else if (message instanceof ObjectMessage) {
       return extractSerializableFromMessage((ObjectMessage)message);
-    }
-    else {
+    } else {
       return message;
     }
   }
@@ -122,19 +126,16 @@ public class JMSObjectInputOperator extends AbstractJMSInputOperator<Object>
   {
     if (outputString.isConnected()) {
       outputString.emit((String)payload);
-    }
-    else if (outputMap.isConnected()) {
+    } else if (outputMap.isConnected()) {
       outputMap.emit((Map<String, Object>)payload);
-    }
-    else if (outputBytes.isConnected()) {
+    } else if (outputBytes.isConnected()) {
       outputBytes.emit((byte[])payload);
-    }
-    else {
+    } else {
       output.emit(payload);
     }
   }
 
   @SuppressWarnings("unused")
-  private static transient final Logger logger = LoggerFactory.getLogger(JMSObjectInputOperator.class);
+  private static final transient Logger logger = LoggerFactory.getLogger(JMSObjectInputOperator.class);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java b/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java
index 4c5c265..11b8447 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java
@@ -20,7 +20,14 @@ package com.datatorrent.lib.io.jms;
 
 import java.io.IOException;
 import java.util.Enumeration;
-import javax.jms.*;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,7 +67,7 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
     try {
 
       beginTransaction();
-      BytesMessage message = (BytesMessage) consumer.receive();
+      BytesMessage message = (BytesMessage)consumer.receive();
       logger.debug("Retrieved committed window message id {}", message.getJMSMessageID());
       long windowId = message.readLong();
 
@@ -71,8 +78,7 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
 
       logger.debug("Retrieved windowId {}", windowId);
       return windowId;
-    }
-    catch (JMSException ex) {
+    } catch (JMSException ex) {
       throw new RuntimeException(ex);
     }
   }
@@ -80,20 +86,19 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
   @Override
   public void storeCommittedWindowId(String appId, int operatorId, long windowId)
   {
-    if(!inTransaction) {
+    if (!inTransaction) {
       throw new RuntimeException("This should be called while you are in an existing transaction");
     }
 
     logger.debug("storing window appId {} operatorId {} windowId {}",
-                 appId, operatorId, windowId);
+        appId, operatorId, windowId);
     try {
       removeCommittedWindowId(appId, operatorId);
       BytesMessage bytesMessage = this.getBase().getSession().createBytesMessage();
       bytesMessage.writeLong(windowId);
       producer.send(bytesMessage);
       logger.debug("Retrieved committed window message id {}", bytesMessage.getJMSMessageID());
-    }
-    catch (JMSException ex) {
+    } catch (JMSException ex) {
       throw new RuntimeException(ex);
     }
   }
@@ -103,8 +108,7 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
   {
     try {
       consumer.receive();
-    }
-    catch (JMSException ex) {
+    } catch (JMSException ex) {
       throw new RuntimeException(ex);
     }
   }
@@ -114,8 +118,7 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
   {
     logger.debug("beginning transaction");
 
-    if(inTransaction)
-    {
+    if (inTransaction) {
       throw new RuntimeException("Cannot start a transaction twice.");
     }
 
@@ -127,15 +130,13 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
   {
     logger.debug("committing transaction.");
 
-    if(!inTransaction)
-    {
+    if (!inTransaction) {
       throw new RuntimeException("Cannot commit a transaction if you are not in one.");
     }
 
     try {
       getBase().getSession().commit();
-    }
-    catch (JMSException ex) {
+    } catch (JMSException ex) {
       throw new RuntimeException(ex);
     }
 
@@ -146,12 +147,9 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
   @Override
   public void rollbackTransaction()
   {
-    try
-    {
+    try {
       getBase().getSession().rollback();
-    }
-    catch (JMSException ex)
-    {
+    } catch (JMSException ex) {
       throw new RuntimeException(ex);
     }
   }
@@ -168,12 +166,11 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
     logger.debug("Entering connect. is in transaction: {}", inTransaction);
 
     try {
-      String queueName = getQueueName(getAppId(),
-                                      getOperatorId());
+      String queueName = getQueueName(getAppId(), getOperatorId());
 
       logger.debug("Base is null: {}", getBase() == null);
 
-      if(getBase() != null) {
+      if (getBase() != null) {
         logger.debug("Session is null: {}", getBase().getSession() == null);
       }
 
@@ -184,8 +181,7 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
       try {
         Enumeration enumeration = browser.getEnumeration();
         hasStore = enumeration.hasMoreElements();
-      }
-      catch (JMSException ex) {
+      } catch (JMSException ex) {
         throw new RuntimeException(ex);
       }
 
@@ -195,15 +191,14 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
       connected = true;
       logger.debug("Connected. is in transaction: {}", inTransaction);
 
-      if(!hasStore) {
+      if (!hasStore) {
         beginTransaction();
         BytesMessage message = getBase().getSession().createBytesMessage();
         message.writeLong(-1L);
         producer.send(message);
         commitTransaction();
       }
-    }
-    catch (JMSException ex) {
+    } catch (JMSException ex) {
       throw new RuntimeException(ex);
     }
 
@@ -217,8 +212,7 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
     try {
       producer.close();
       consumer.close();
-    }
-    catch (JMSException ex) {
+    } catch (JMSException ex) {
       throw new RuntimeException(ex);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/jms/package-info.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/package-info.java b/library/src/main/java/com/datatorrent/lib/io/jms/package-info.java
index 49768c1..ad4f163 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/package-info.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/package-info.java
@@ -20,4 +20,4 @@
  * Library of input operators for writing into jms broker and output operators for reading from jms broker.
  * The jms operators interact with entities outside of DAG, and at times outside of Hadoop
  */
-package com.datatorrent.lib.io.jms;
\ No newline at end of file
+package com.datatorrent.lib.io.jms;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/ApacheLogParseMapOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/ApacheLogParseMapOutputOperator.java b/library/src/main/java/com/datatorrent/lib/logs/ApacheLogParseMapOutputOperator.java
index 5883b98..33bdbaa 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/ApacheLogParseMapOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/ApacheLogParseMapOutputOperator.java
@@ -24,14 +24,16 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import com.datatorrent.common.util.BaseOperator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.Stateless;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * This operator parses apache logs one line at a time (each tuple is a log line), using the given regex.&nbsp;
@@ -83,8 +85,7 @@ public class ApacheLogParseMapOutputOperator extends BaseOperator
     {
       try {
         processTuple(s);
-      }
-      catch (ParseException ex) {
+      } catch (ParseException ex) {
         throw new RuntimeException("Could not parse the input string", ex);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/ApacheLogParseOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/ApacheLogParseOperator.java b/library/src/main/java/com/datatorrent/lib/logs/ApacheLogParseOperator.java
index 938a927..f1dffe8 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/ApacheLogParseOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/ApacheLogParseOperator.java
@@ -22,11 +22,11 @@ import java.text.ParseException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * Parse Apache log lines one line at a time.&nbsp;
@@ -58,116 +58,115 @@ import com.datatorrent.api.annotation.Stateless;
  * @since 0.3.3
  */
 @Stateless
-@OperatorAnnotation(partitionable=true)
+@OperatorAnnotation(partitionable = true)
 public class ApacheLogParseOperator extends BaseOperator
 {
   /**
    * This is the input port which receives apache log lines.
    */
-	public final transient DefaultInputPort<String> data = new DefaultInputPort<String>()
-	{
-		@Override
-		public void process(String s)
-		{
-			try {
-				processTuple(s);
-			} catch (ParseException ex) {
-				// ignore
-			}
-		}
-	};
+  public final transient DefaultInputPort<String> data = new DefaultInputPort<String>()
+  {
+    @Override
+    public void process(String s)
+    {
+      try {
+        processTuple(s);
+      } catch (ParseException ex) {
+        // ignore
+      }
+    }
+  };
 
-	/**
-	 * Client IP address, output port.
-	 */
-	public final transient DefaultOutputPort<String> outputIPAddress = new DefaultOutputPort<String>();
+  /**
+   * Client IP address, output port.
+   */
+  public final transient DefaultOutputPort<String> outputIPAddress = new DefaultOutputPort<String>();
 
-	/**
-	 * Access url port, output port.
-	 */
-	public final transient DefaultOutputPort<String> outputUrl = new DefaultOutputPort<String>();
+  /**
+   * Access url port, output port.
+   */
+  public final transient DefaultOutputPort<String> outputUrl = new DefaultOutputPort<String>();
 
-	/**
-	 * Apache status log, output port.
-	 */
-	public final transient DefaultOutputPort<String> outputStatusCode = new DefaultOutputPort<String>();
+  /**
+   * Apache status log, output port.
+   */
+  public final transient DefaultOutputPort<String> outputStatusCode = new DefaultOutputPort<String>();
 
-	/**
-	 * Number of bytes served, output port.
-	 */
-	public final transient DefaultOutputPort<Long> outputBytes = new DefaultOutputPort<Long>();
+  /**
+   * Number of bytes served, output port.
+   */
+  public final transient DefaultOutputPort<Long> outputBytes = new DefaultOutputPort<Long>();
 
-	/**
-	 * Referer name, output port.
-	 */
-	public final transient DefaultOutputPort<String> outputReferer = new DefaultOutputPort<String>();
+  /**
+   * Referer name, output port.
+   */
+  public final transient DefaultOutputPort<String> outputReferer = new DefaultOutputPort<String>();
 
-	/**
-	 * IP Agent, output port.
-	 */
-	public final transient DefaultOutputPort<String> outputAgent = new DefaultOutputPort<String>();
+  /**
+   * IP Agent, output port.
+   */
+  public final transient DefaultOutputPort<String> outputAgent = new DefaultOutputPort<String>();
 
-	/**
-	 * Get apache log pattern regex.
-	 * @return regex string.
-	 */
-	protected static String getAccessLogRegex()
-	{
-		String regex1 = "^([\\d\\.]+)"; // Client IP
-		String regex2 = " (\\S+)"; // -
-		String regex3 = " (\\S+)"; // -
-		String regex4 = " \\[([\\w:/]+\\s[+\\-]\\d{4})\\]"; // Date
-		String regex5 = " \"[A-Z]+ (.+?) HTTP/\\S+\""; // url
-		String regex6 = " (\\d{3})"; // HTTP code
-		String regex7 = " (\\d+)"; // Number of bytes
-		String regex8 = " \"([^\"]+)\""; // Referer
-		String regex9 = " \"([^\"]+)\""; // Agent
-		String regex10 = ".*"; // ignore the rest
-		return regex1 + regex2 + regex3 + regex4 + regex5 + regex6 + regex7
-				+ regex8 + regex9 + regex10;
-	}
+  /**
+   * Get apache log pattern regex.
+   * @return regex string.
+   */
+  protected static String getAccessLogRegex()
+  {
+    String regex1 = "^([\\d\\.]+)"; // Client IP
+    String regex2 = " (\\S+)"; // -
+    String regex3 = " (\\S+)"; // -
+    String regex4 = " \\[([\\w:/]+\\s[+\\-]\\d{4})\\]"; // Date
+    String regex5 = " \"[A-Z]+ (.+?) HTTP/\\S+\""; // url
+    String regex6 = " (\\d{3})"; // HTTP code
+    String regex7 = " (\\d+)"; // Number of bytes
+    String regex8 = " \"([^\"]+)\""; // Referer
+    String regex9 = " \"([^\"]+)\""; // Agent
+    String regex10 = ".*"; // ignore the rest
+    return regex1 + regex2 + regex3 + regex4 + regex5 + regex6 + regex7
+        + regex8 + regex9 + regex10;
+  }
 
-	/**
-	 * Parses Apache combined access log, and prints out the following <br>
-	 * 1. Requester IP <br>
-	 * 2. Date of Request <br>
-	 * 3. Requested Page Path
-	 *
-	 * @param line
-	 *          : tuple to parsee
-	 * @throws ParseException
-	 */
-	public void processTuple(String line) throws ParseException
-	{
-		// Apapche log attaributes on each line.
-		String url;
-		String httpStatusCode;
-		long numOfBytes;
-		String referer;
-		String agent;
-		String ipAddr;
+  /**
+   * Parses Apache combined access log, and prints out the following <br>
+   * 1. Requester IP <br>
+   * 2. Date of Request <br>
+   * 3. Requested Page Path
+   *
+   * @param line
+   *          : tuple to parsee
+   * @throws ParseException
+   */
+  public void processTuple(String line) throws ParseException
+  {
+    // Apapche log attaributes on each line.
+    String url;
+    String httpStatusCode;
+    long numOfBytes;
+    String referer;
+    String agent;
+    String ipAddr;
 
-		// Parse each log line.
-		Pattern accessLogPattern = Pattern.compile(getAccessLogRegex(),
-				Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
-		Matcher accessLogEntryMatcher;
-		accessLogEntryMatcher = accessLogPattern.matcher(line);
+    // Parse each log line.
+    Pattern accessLogPattern = Pattern.compile(getAccessLogRegex(),
+        Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
+    Matcher accessLogEntryMatcher;
+    accessLogEntryMatcher = accessLogPattern.matcher(line);
 
-		if (accessLogEntryMatcher.matches()) {
-			// System.out.println("MATCHED!");
-			ipAddr = accessLogEntryMatcher.group(1);
-			url = accessLogEntryMatcher.group(5);
-			httpStatusCode = accessLogEntryMatcher.group(6);
-			numOfBytes = Long.parseLong(accessLogEntryMatcher.group(7));
-			referer = accessLogEntryMatcher.group(8);
-			agent = accessLogEntryMatcher.group(9);
+    if (accessLogEntryMatcher.matches()) {
+      ipAddr = accessLogEntryMatcher.group(1);
+      url = accessLogEntryMatcher.group(5);
+      httpStatusCode = accessLogEntryMatcher.group(6);
+      numOfBytes = Long.parseLong(accessLogEntryMatcher.group(7));
+      referer = accessLogEntryMatcher.group(8);
+      agent = accessLogEntryMatcher.group(9);
 
-			outputIPAddress.emit(ipAddr);
-			outputUrl.emit(url);
-			outputStatusCode.emit(httpStatusCode);
-			outputBytes.emit(numOfBytes);
-			outputReferer.emit(referer);
-			outputAgent.emit(agent);
-		}
-	}
+      outputIPAddress.emit(ipAddr);
+      outputUrl.emit(url);
+      outputStatusCode.emit(httpStatusCode);
+      outputBytes.emit(numOfBytes);
+      outputReferer.emit(referer);
+      outputAgent.emit(agent);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/ApacheVirtualLogParseOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/ApacheVirtualLogParseOperator.java b/library/src/main/java/com/datatorrent/lib/logs/ApacheVirtualLogParseOperator.java
index 3861639..1ba555e 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/ApacheVirtualLogParseOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/ApacheVirtualLogParseOperator.java
@@ -18,11 +18,6 @@
  */
 package com.datatorrent.lib.logs;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.Stateless;
-
 import java.io.IOException;
 import java.text.ParseException;
 import java.util.HashMap;
@@ -30,6 +25,11 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
+
 /**
  * Parse Apache log lines one line at a time.&nbsp;
  * Regex (getAccessLogRegex) is used as a parser.&nbsp;
@@ -47,146 +47,154 @@ import java.util.regex.Pattern;
  * @since 0.3.2
  */
 @Stateless
-public class ApacheVirtualLogParseOperator extends BaseOperator {
-
-    // default date format
-    protected static final String dateFormat = "dd/MMM/yyyy:HH:mm:ss Z";
-    /**
-     *
-     */
-    public final transient DefaultInputPort<String> data = new DefaultInputPort<String>() {
-        @Override
-        public void process(String s) {
-            try {
-                processTuple(s);
-            } catch (ParseException ex) {
-                // ignore
-            }
-        }
-    };
-
-    /**
-     * This output port emits the IPAddresses contained in log file lines.
-     */
-    public final transient DefaultOutputPort<String> outputIPAddress = new DefaultOutputPort<String>();
-    /**
-     * This output port emits URLs contained in log file lines.
-     */
-    public final transient DefaultOutputPort<String> outputUrl = new DefaultOutputPort<String>();
-    /**
-     * This output port emits status codes contained in log file lines.
-     */
-    public final transient DefaultOutputPort<String> outputStatusCode = new DefaultOutputPort<String>();
-    /**
-     * This output pot emits a Map for each log file line,
-     * which contains all the information extracted from the log file line.
-     */
-    public final transient DefaultOutputPort<Map<String, Integer>> outputBytes = new DefaultOutputPort<Map<String, Integer>>();
-    /**
-     * This output port emits the referers contained in the log file lines.
-     */
-    public final transient DefaultOutputPort<String> outputReferer = new DefaultOutputPort<String>();
-    /**
-     * This output port emits the agents contained in the log file lines.
-     */
-    public final transient DefaultOutputPort<String> outputAgent = new DefaultOutputPort<String>();
-    /**
-     * This output port emits the servernames contained in the log file lines.
-     */
-    public final transient DefaultOutputPort<String> outputServerName = new DefaultOutputPort<String>();
-    /**
-     * This output port emits the servernames contained in the log file lines.
-     */
-    public final transient DefaultOutputPort<String> outputServerName1 = new DefaultOutputPort<String>();
-    /**
-     * This output port emits the status codes corresponding to each url in a log file line.
-     */
-    public final transient DefaultOutputPort<Map<String, String>> outUrlStatus = new DefaultOutputPort<Map<String, String>>();
-    /**
-     * This output port emits the status associated with each server in a log file line.
-     */
-    public final transient DefaultOutputPort<Map<String, String>> outServerStatus = new DefaultOutputPort<Map<String, String>>();
-    /**
-     * This output port emits client data usage contained in log file lines.
-     */
-    public final transient DefaultOutputPort<Integer> clientDataUsage = new DefaultOutputPort<Integer>();
-    /**
-     * This output port emits the view counts contained in log file lines.
-     */
-    public final transient DefaultOutputPort<Integer> viewCount = new DefaultOutputPort<Integer>();
-
-    protected static String getAccessLogRegex() {
-    	  String regex0 = "^([^\"]+)";
-        String regex1 = " ([\\d\\.]+)";                         // Client IP
-        String regex2 = " (\\S+)";                             // -
-        String regex3 = " (\\S+)";                             // -
-        String regex4 = " \\[([\\w:/]+\\s[+\\-]\\d{4})\\]"; // Date
-        String regex5 = " \"[A-Z]+ (.+?) HTTP/\\S+\"";                       //  url
-        String regex6 = " (\\d{3})";                           // HTTP code
-        String regex7 = " (\\d+)";                     // Number of bytes
-        String regex8 = " \"([^\"]+)\"";                 // Referer
-        String regex9 = " \"([^\"]+)\"";                // Agent
-        String regex10 = ".*"; // ignore the rest
-        return regex0 + regex1 + regex2 + regex3 + regex4 + regex5 + regex6 + regex7 + regex8 + regex9 + regex10;
+public class ApacheVirtualLogParseOperator extends BaseOperator
+{
+
+  // default date format
+  protected static final String dateFormat = "dd/MMM/yyyy:HH:mm:ss Z";
+  /**
+   *
+   */
+  public final transient DefaultInputPort<String> data = new DefaultInputPort<String>()
+  {
+    @Override
+    public void process(String s)
+    {
+      try {
+        processTuple(s);
+      } catch (ParseException ex) {
+        // ignore
+      }
     }
-
-    /**
-     * Parses Apache combined access log, and prints out the following <br>1.
-     * Requester IP <br>2. Date of Request <br>3. Requested Page Path
-     *
-     * @param line : tuple to parsee
-     * @throws ParseException
-     * @throws IOException
-     */
-    public void processTuple(String line) throws ParseException {
-
-    	  // Apache log properties.
-        String url;
-        String httpStatusCode;
-        long numOfBytes;
-        String referer;
-        String agent;
-        String ipAddr;
-        String serverName;
-
-        // Parser log.
-        Pattern accessLogPattern = Pattern.compile(getAccessLogRegex(), Pattern.CASE_INSENSITIVE
-                | Pattern.DOTALL);
-        Matcher accessLogEntryMatcher;
-        accessLogEntryMatcher = accessLogPattern.matcher(line);
-
-        if (accessLogEntryMatcher.matches()) {
-
-        	  serverName = accessLogEntryMatcher.group(1);
-            ipAddr = accessLogEntryMatcher.group(2);
-            url = accessLogEntryMatcher.group(6);
-            httpStatusCode = accessLogEntryMatcher.group(7);
-            numOfBytes = Long.parseLong(accessLogEntryMatcher.group(8));
-            referer = accessLogEntryMatcher.group(9);
-            agent = accessLogEntryMatcher.group(10);
-
-            outputIPAddress.emit(ipAddr);
-            outputUrl.emit(url);
-            outputStatusCode.emit(httpStatusCode);
-            Map<String, Integer> ipdata = new HashMap<String, Integer>();
-            ipdata.put(ipAddr, (int)numOfBytes);
-            outputBytes.emit(ipdata);
-            outputReferer.emit(referer);
-            outputAgent.emit(agent);
-            outputServerName.emit(serverName);
-            outputServerName1.emit(serverName);
-
-            HashMap<String, String> urlStatus = new HashMap<String, String>();
-            urlStatus.put(url, httpStatusCode);
-            outUrlStatus.emit(urlStatus);
-
-            HashMap<String, String> serverStatus = new HashMap<String, String>();
-            serverStatus.put(serverName, httpStatusCode);
-            outServerStatus.emit(serverStatus);
-
-            clientDataUsage.emit((int)numOfBytes);
-            viewCount.emit(new Integer(1));
-        }
+  };
+
+  /**
+   * This output port emits the IPAddresses contained in log file lines.
+   */
+  public final transient DefaultOutputPort<String> outputIPAddress = new DefaultOutputPort<String>();
+  /**
+   * This output port emits URLs contained in log file lines.
+   */
+  public final transient DefaultOutputPort<String> outputUrl = new DefaultOutputPort<String>();
+  /**
+   * This output port emits status codes contained in log file lines.
+   */
+  public final transient DefaultOutputPort<String> outputStatusCode = new DefaultOutputPort<String>();
+  /**
+   * This output pot emits a Map for each log file line,
+   * which contains all the information extracted from the log file line.
+   */
+  public final transient DefaultOutputPort<Map<String, Integer>> outputBytes =
+      new DefaultOutputPort<Map<String, Integer>>();
+  /**
+   * This output port emits the referers contained in the log file lines.
+   */
+  public final transient DefaultOutputPort<String> outputReferer = new DefaultOutputPort<String>();
+  /**
+   * This output port emits the agents contained in the log file lines.
+   */
+  public final transient DefaultOutputPort<String> outputAgent = new DefaultOutputPort<String>();
+  /**
+   * This output port emits the servernames contained in the log file lines.
+   */
+  public final transient DefaultOutputPort<String> outputServerName = new DefaultOutputPort<String>();
+  /**
+   * This output port emits the servernames contained in the log file lines.
+   */
+  public final transient DefaultOutputPort<String> outputServerName1 = new DefaultOutputPort<String>();
+  /**
+   * This output port emits the status codes corresponding to each url in a log file line.
+   */
+  public final transient DefaultOutputPort<Map<String, String>> outUrlStatus =
+      new DefaultOutputPort<Map<String, String>>();
+  /**
+   * This output port emits the status associated with each server in a log file line.
+   */
+  public final transient DefaultOutputPort<Map<String, String>> outServerStatus =
+      new DefaultOutputPort<Map<String, String>>();
+  /**
+   * This output port emits client data usage contained in log file lines.
+   */
+  public final transient DefaultOutputPort<Integer> clientDataUsage = new DefaultOutputPort<Integer>();
+  /**
+   * This output port emits the view counts contained in log file lines.
+   */
+  public final transient DefaultOutputPort<Integer> viewCount = new DefaultOutputPort<Integer>();
+
+  protected static String getAccessLogRegex()
+  {
+    String regex0 = "^([^\"]+)";
+    String regex1 = " ([\\d\\.]+)";                         // Client IP
+    String regex2 = " (\\S+)";                             // -
+    String regex3 = " (\\S+)";                             // -
+    String regex4 = " \\[([\\w:/]+\\s[+\\-]\\d{4})\\]"; // Date
+    String regex5 = " \"[A-Z]+ (.+?) HTTP/\\S+\"";                       //  url
+    String regex6 = " (\\d{3})";                           // HTTP code
+    String regex7 = " (\\d+)";                     // Number of bytes
+    String regex8 = " \"([^\"]+)\"";                 // Referer
+    String regex9 = " \"([^\"]+)\"";                // Agent
+    String regex10 = ".*"; // ignore the rest
+    return regex0 + regex1 + regex2 + regex3 + regex4 + regex5 + regex6 + regex7 + regex8 + regex9 + regex10;
+  }
+
+  /**
+   * Parses Apache combined access log, and prints out the following <br>1.
+   * Requester IP <br>2. Date of Request <br>3. Requested Page Path
+   *
+   * @param line : tuple to parsee
+   * @throws ParseException
+   * @throws IOException
+   */
+  public void processTuple(String line) throws ParseException
+  {
+
+    // Apache log properties.
+    String url;
+    String httpStatusCode;
+    long numOfBytes;
+    String referer;
+    String agent;
+    String ipAddr;
+    String serverName;
+
+    // Parser log.
+    Pattern accessLogPattern = Pattern.compile(getAccessLogRegex(), Pattern.CASE_INSENSITIVE
+        | Pattern.DOTALL);
+    Matcher accessLogEntryMatcher;
+    accessLogEntryMatcher = accessLogPattern.matcher(line);
+
+    if (accessLogEntryMatcher.matches()) {
+
+      serverName = accessLogEntryMatcher.group(1);
+      ipAddr = accessLogEntryMatcher.group(2);
+      url = accessLogEntryMatcher.group(6);
+      httpStatusCode = accessLogEntryMatcher.group(7);
+      numOfBytes = Long.parseLong(accessLogEntryMatcher.group(8));
+      referer = accessLogEntryMatcher.group(9);
+      agent = accessLogEntryMatcher.group(10);
+
+      outputIPAddress.emit(ipAddr);
+      outputUrl.emit(url);
+      outputStatusCode.emit(httpStatusCode);
+      Map<String, Integer> ipdata = new HashMap<String, Integer>();
+      ipdata.put(ipAddr, (int)numOfBytes);
+      outputBytes.emit(ipdata);
+      outputReferer.emit(referer);
+      outputAgent.emit(agent);
+      outputServerName.emit(serverName);
+      outputServerName1.emit(serverName);
+
+      HashMap<String, String> urlStatus = new HashMap<String, String>();
+      urlStatus.put(url, httpStatusCode);
+      outUrlStatus.emit(urlStatus);
+
+      HashMap<String, String> serverStatus = new HashMap<String, String>();
+      serverStatus.put(serverName, httpStatusCode);
+      outServerStatus.emit(serverStatus);
+
+      clientDataUsage.emit((int)numOfBytes);
+      viewCount.emit(new Integer(1));
     }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/DimensionAggregationUnifier.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/DimensionAggregationUnifier.java b/library/src/main/java/com/datatorrent/lib/logs/DimensionAggregationUnifier.java
index 2c6ad20..6c56b11 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/DimensionAggregationUnifier.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/DimensionAggregationUnifier.java
@@ -43,8 +43,10 @@ public class DimensionAggregationUnifier implements Operator
 
   private Map<String, Map<String, MutableDouble>> dataMap = new HashMap<String, Map<String, MutableDouble>>();
 
-  public final transient DefaultOutputPort<Map<String, DimensionObject<String>>> output = new DefaultOutputPort<Map<String, DimensionObject<String>>>();
-  public final transient DefaultInputPort<Map<String, DimensionObject<String>>> input = new DefaultInputPort<Map<String, DimensionObject<String>>>() {
+  public final transient DefaultOutputPort<Map<String, DimensionObject<String>>> output = new DefaultOutputPort<>();
+
+  public final transient DefaultInputPort<Map<String, DimensionObject<String>>> input = new DefaultInputPort<Map<String, DimensionObject<String>>>()
+  {
 
     @Override
     public void process(Map<String, DimensionObject<String>> tuple)

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/DimensionObject.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/DimensionObject.java b/library/src/main/java/com/datatorrent/lib/logs/DimensionObject.java
index f6c402a..cba895f 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/DimensionObject.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/DimensionObject.java
@@ -35,7 +35,8 @@ public class DimensionObject<T> implements Comparable<DimensionObject<T>>
   private T val;
 
   @SuppressWarnings("unused")
-  private DimensionObject(){
+  private DimensionObject()
+  {
 
   }
 
@@ -86,12 +87,14 @@ public class DimensionObject<T> implements Comparable<DimensionObject<T>>
   @Override
   public boolean equals(Object obj)
   {
-    if (obj == null)
+    if (obj == null) {
       return false;
-    if (!this.getClass().equals(obj.getClass()))
+    }
+    if (!this.getClass().equals(obj.getClass())) {
       return false;
+    }
     @SuppressWarnings("unchecked")
-    DimensionObject<T> obj2 = (DimensionObject<T>) obj;
+    DimensionObject<T> obj2 = (DimensionObject<T>)obj;
     return this.val.equals(obj2.val);
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/FilteredLineToTokenArrayList.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/FilteredLineToTokenArrayList.java b/library/src/main/java/com/datatorrent/lib/logs/FilteredLineToTokenArrayList.java
index 20dcf43..de0e1bd 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/FilteredLineToTokenArrayList.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/FilteredLineToTokenArrayList.java
@@ -18,9 +18,10 @@
  */
 package com.datatorrent.lib.logs;
 
+import java.util.HashMap;
+
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.Stateless;
-import java.util.HashMap;
 
 /**
  * <p>
@@ -53,7 +54,7 @@ import java.util.HashMap;
  * @since 0.3.2
  */
 @Stateless
-@OperatorAnnotation(partitionable=true)
+@OperatorAnnotation(partitionable = true)
 public class FilteredLineToTokenArrayList extends LineToTokenArrayList
 {
   HashMap<String, Object> filterBy = new HashMap<String, Object>();

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/FilteredLineToTokenHashMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/FilteredLineToTokenHashMap.java b/library/src/main/java/com/datatorrent/lib/logs/FilteredLineToTokenHashMap.java
index f67b0b7..c79e884 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/FilteredLineToTokenHashMap.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/FilteredLineToTokenHashMap.java
@@ -18,9 +18,10 @@
  */
 package com.datatorrent.lib.logs;
 
+import java.util.HashMap;
+
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.Stateless;
-import java.util.HashMap;
 
 /**
  * <p>
@@ -55,35 +56,35 @@ import java.util.HashMap;
  * @since 0.3.3
  */
 @Stateless
-@OperatorAnnotation(partitionable=true)
+@OperatorAnnotation(partitionable = true)
 public class FilteredLineToTokenHashMap extends LineToTokenHashMap
 {
-	HashMap<String, Object> filterBy = new HashMap<String, Object>(4);
+  HashMap<String, Object> filterBy = new HashMap<String, Object>(4);
 
-	/**
-	 * setter function for filterBy
-	 *
-	 * @param list
-	 *          list of keys for subtoken filters
-	 */
-	public void setFilterBy(String[] list)
-	{
-		if (list != null) {
-			for (String s : list) {
-				filterBy.put(s, null);
-			}
-		}
-	}
+  /**
+   * setter function for filterBy
+   *
+   * @param list
+   *          list of keys for subtoken filters
+   */
+  public void setFilterBy(String[] list)
+  {
+    if (list != null) {
+      for (String s : list) {
+        filterBy.put(s, null);
+      }
+    }
+  }
 
-	/**
-	 * If the key is in the filter, returns true
-	 *
-	 * @param subtok
-	 * @return true if super.validToken (!isEmpty()) and filter has they token
-	 */
-	@Override
-	public boolean validSubTokenKey(String subtok)
-	{
-		return super.validToken(subtok) && filterBy.containsKey(subtok);
-	}
+  /**
+   * If the key is in the filter, returns true
+   *
+   * @param subtok
+   * @return true if super.validToken (!isEmpty()) and filter has they token
+   */
+  @Override
+  public boolean validSubTokenKey(String subtok)
+  {
+    return super.validToken(subtok) && filterBy.containsKey(subtok);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/FilteredLineTokenizerKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/FilteredLineTokenizerKeyVal.java b/library/src/main/java/com/datatorrent/lib/logs/FilteredLineTokenizerKeyVal.java
index 37e66ff..a6acd4c 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/FilteredLineTokenizerKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/FilteredLineTokenizerKeyVal.java
@@ -18,9 +18,10 @@
  */
 package com.datatorrent.lib.logs;
 
+import java.util.HashMap;
+
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.Stateless;
-import java.util.HashMap;
 
 /**
  * Splits lines into tokens, and tokens into sub-tokens and emits key,val pairs in a HashMap.&nbsp;
@@ -52,35 +53,35 @@ import java.util.HashMap;
  * @since 0.3.3
  */
 @Stateless
-@OperatorAnnotation(partitionable=true)
+@OperatorAnnotation(partitionable = true)
 public class FilteredLineTokenizerKeyVal extends LineTokenizerKeyVal
 {
-	HashMap<String, Object> filterBy = new HashMap<String, Object>(4);
+  HashMap<String, Object> filterBy = new HashMap<String, Object>(4);
 
-	/**
-	 * setter function for filterBy
-	 *
-	 * @param list
-	 *          list of keys for subtoken filters
-	 */
-	public void setFilterBy(String[] list)
-	{
-		if (list != null) {
-			for (String s : list) {
-				filterBy.put(s, null);
-			}
-		}
-	}
+  /**
+   * setter function for filterBy
+   *
+   * @param list
+   *          list of keys for subtoken filters
+   */
+  public void setFilterBy(String[] list)
+  {
+    if (list != null) {
+      for (String s : list) {
+        filterBy.put(s, null);
+      }
+    }
+  }
 
-	/**
-	 * If the key is in the filter, returns true
-	 *
-	 * @param subtok
-	 * @return true if super.validToken (!isEmpty()) and filter has they token
-	 */
-	@Override
-	public boolean validSubTokenKey(String subtok)
-	{
-		return super.validToken(subtok) && filterBy.containsKey(subtok);
-	}
+  /**
+   * If the key is in the filter, returns true
+   *
+   * @param subtok
+   * @return true if super.validToken (!isEmpty()) and filter has they token
+   */
+  @Override
+  public boolean validSubTokenKey(String subtok)
+  {
+    return super.validToken(subtok) && filterBy.containsKey(subtok);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/LineToTokenArrayList.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/LineToTokenArrayList.java b/library/src/main/java/com/datatorrent/lib/logs/LineToTokenArrayList.java
index ffc211d..182078b 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/LineToTokenArrayList.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/LineToTokenArrayList.java
@@ -56,7 +56,7 @@ import com.datatorrent.lib.util.UnifierArrayList;
  * @since 0.3.2
  */
 @Stateless
-@OperatorAnnotation(partitionable=true)
+@OperatorAnnotation(partitionable = true)
 public class LineToTokenArrayList extends BaseLineTokenizer
 {
   protected transient ArrayList<String> tokentuple = null;
@@ -79,15 +79,16 @@ public class LineToTokenArrayList extends BaseLineTokenizer
   };
 
   /**
-	 * This output port emits a map from tokens to sub tokens.
-	 */
+   * This output port emits a map from tokens to sub tokens.
+   */
   @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<ArrayList<HashMap<String, ArrayList<String>>>> splittokens = new DefaultOutputPort<ArrayList<HashMap<String, ArrayList<String>>>>()
+  public final transient DefaultOutputPort<ArrayList<HashMap<String, ArrayList<String>>>> splittokens =
+      new DefaultOutputPort<ArrayList<HashMap<String, ArrayList<String>>>>()
   {
     @Override
     public Unifier<ArrayList<HashMap<String, ArrayList<String>>>> getUnifier()
     {
-      return new UnifierArrayList<HashMap<String, ArrayList<String>>>();
+      return new UnifierArrayList<>();
     }
   };
 
@@ -152,8 +153,7 @@ public class LineToTokenArrayList extends BaseLineTokenizer
   {
     if (smap.isEmpty()) {
       smap.put(subtok, vals);
-    }
-    else {
+    } else {
       vals.add(subtok);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/LineToTokenHashMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/LineToTokenHashMap.java b/library/src/main/java/com/datatorrent/lib/logs/LineToTokenHashMap.java
index d0a67f3..0060c74 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/LineToTokenHashMap.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/LineToTokenHashMap.java
@@ -54,7 +54,7 @@ import com.datatorrent.lib.util.UnifierHashMap;
  * @since 0.3.2
  */
 @Stateless
-@OperatorAnnotation(partitionable=true)
+@OperatorAnnotation(partitionable = true)
 public class LineToTokenHashMap extends BaseLineTokenizer
 {
   /**
@@ -103,8 +103,7 @@ public class LineToTokenHashMap extends BaseLineTokenizer
     if (vals == null) {
       tok = subtok;
       vals = new ArrayList<String>();
-    }
-    else {
+    } else {
       vals.add(subtok);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/LineTokenizer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/LineTokenizer.java b/library/src/main/java/com/datatorrent/lib/logs/LineTokenizer.java
index 1ddd40c..814d132 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/LineTokenizer.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/LineTokenizer.java
@@ -19,7 +19,8 @@
 package com.datatorrent.lib.logs;
 
 import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.*;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.util.BaseLineTokenizer;
 
 /**
@@ -47,25 +48,25 @@ import com.datatorrent.lib.util.BaseLineTokenizer;
  * @since 0.3.3
  */
 @Stateless
-@OperatorAnnotation(partitionable=true)
+@OperatorAnnotation(partitionable = true)
 public class LineTokenizer extends BaseLineTokenizer
 {
   /**
    * The is the output port that emits string tokens.
    */
-	public final transient DefaultOutputPort<String> tokens = new DefaultOutputPort<String>();
+  public final transient DefaultOutputPort<String> tokens = new DefaultOutputPort<String>();
 
-	/**
-	 * emits tokens on port "tokens" if tok is not empty
-	 *
-	 * @param tok
-	 */
-	@Override
-	public void processToken(String tok)
-	{
-		if (!tok.isEmpty()) {
-			tokens.emit(tok);
-		}
-	}
+  /**
+   * emits tokens on port "tokens" if tok is not empty
+   *
+   * @param tok
+   */
+  @Override
+  public void processToken(String tok)
+  {
+    if (!tok.isEmpty()) {
+      tokens.emit(tok);
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/LineTokenizerKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/LineTokenizerKeyVal.java b/library/src/main/java/com/datatorrent/lib/logs/LineTokenizerKeyVal.java
index 2a568d1..0d89996 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/LineTokenizerKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/LineTokenizerKeyVal.java
@@ -18,11 +18,13 @@
  */
 package com.datatorrent.lib.logs;
 
+import java.util.HashMap;
+
 import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.*;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.util.BaseLineTokenizer;
 import com.datatorrent.lib.util.UnifierHashMap;
-import java.util.HashMap;
 
 /**
  * This operator splits lines into tokens, and tokens into sub-tokens.&nbsp;
@@ -44,7 +46,8 @@ import java.util.HashMap;
  * <br>
  * <b>Properties</b>:<br>
  * <b>splitby</b>: The characters used to split the line. Default is ";\t "<br>
- * <b>splittokenby</b>: The characters used to split a token into key,val pair. Default is "", i.e. tokens are not split, and key is set to token, and val is null<br>
+ * <b>splittokenby</b>: The characters used to split a token into key,val pair. Default is "", i.e. tokens are not
+ * split, and key is set to token, and val is null<br>
  * <br>
  * </p>
  * @displayName Line Tokenizer Key Value
@@ -54,7 +57,7 @@ import java.util.HashMap;
  * @since 0.3.2
  */
 @Stateless
-@OperatorAnnotation(partitionable=true)
+@OperatorAnnotation(partitionable = true)
 public class LineTokenizerKeyVal extends BaseLineTokenizer
 {
   /**
@@ -129,8 +132,7 @@ public class LineTokenizerKeyVal extends BaseLineTokenizer
   {
     if (skey.isEmpty()) {
       skey = subtok;
-    }
-    else if (sval.isEmpty()) {
+    } else if (sval.isEmpty()) {
       sval = subtok;
     }
   }