You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/26 21:10:22 UTC

[4/6] apex-malhar git commit: Fixed checkstyle errors for demos.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
index 6a6777e..c8a0e51 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
@@ -57,7 +57,7 @@ import static org.apache.apex.malhar.stream.api.Option.Options.name;
 public class WindowedWordCount implements StreamingApplication
 {
   static final int WINDOW_SIZE = 1;  // Default window duration in minutes
-  
+
   /**
    * A input operator that reads from and output a file line by line to downstream with a time gap between
    * every two lines.
@@ -65,23 +65,23 @@ public class WindowedWordCount implements StreamingApplication
   public static class TextInput extends BaseOperator implements InputOperator
   {
     private static boolean done = false;
-    
+
     public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
-    
+
     private transient BufferedReader reader;
-  
+
     public static boolean isDone()
     {
       return done;
     }
-  
+
     @Override
     public void setup(Context.OperatorContext context)
     {
       done = false;
       initReader();
     }
-    
+
     private void initReader()
     {
       try {
@@ -91,13 +91,13 @@ public class WindowedWordCount implements StreamingApplication
         throw Throwables.propagate(ex);
       }
     }
-    
+
     @Override
     public void teardown()
     {
       IOUtils.closeQuietly(reader);
     }
-    
+
     @Override
     public void emitTuples()
     {
@@ -118,16 +118,16 @@ public class WindowedWordCount implements StreamingApplication
       }
     }
   }
-  
+
   public static class Collector extends BaseOperator
   {
     private static Map<KeyValPair<Long, String>, Long> result = new HashMap<>();
-  
+
     public static Map<KeyValPair<Long, String>, Long> getResult()
     {
       return result;
     }
-  
+
     public final transient DefaultInputPort<PojoEvent> input = new DefaultInputPort<PojoEvent>()
     {
       @Override
@@ -137,7 +137,7 @@ public class WindowedWordCount implements StreamingApplication
       }
     };
   }
-  
+
   /**
    * A Pojo Tuple class used for outputting result to JDBC.
    */
@@ -146,44 +146,44 @@ public class WindowedWordCount implements StreamingApplication
     private String word;
     private long count;
     private long timestamp;
-  
+
     @Override
     public String toString()
     {
       return "PojoEvent (word=" + getWord() + ", count=" + getCount() + ", timestamp=" + getTimestamp() + ")";
     }
-  
+
     public String getWord()
     {
       return word;
     }
-  
+
     public void setWord(String word)
     {
       this.word = word;
     }
-  
+
     public long getCount()
     {
       return count;
     }
-  
+
     public void setCount(long count)
     {
       this.count = count;
     }
-  
+
     public long getTimestamp()
     {
       return timestamp;
     }
-  
+
     public void setTimestamp(long timestamp)
     {
       this.timestamp = timestamp;
     }
   }
-  
+
   /**
    * A map function that wrap the input string with a random generated timestamp.
    */
@@ -191,12 +191,12 @@ public class WindowedWordCount implements StreamingApplication
   {
     private static final Duration RAND_RANGE = Duration.standardMinutes(10);
     private final Long minTimestamp;
-    
+
     AddTimestampFn()
     {
       this.minTimestamp = System.currentTimeMillis();
     }
-    
+
     @Override
     public Tuple.TimestampedTuple<String> f(String input)
     {
@@ -207,7 +207,7 @@ public class WindowedWordCount implements StreamingApplication
       return new Tuple.TimestampedTuple<>(randomTimestamp, input);
     }
   }
-  
+
   /** A MapFunction that converts a Word and Count into a PojoEvent. */
   public static class FormatAsTableRowFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, PojoEvent>
   {
@@ -221,7 +221,7 @@ public class WindowedWordCount implements StreamingApplication
       return row;
     }
   }
-  
+
   /**
    * Populate dag with High-Level API.
    * @param dag
@@ -232,10 +232,10 @@ public class WindowedWordCount implements StreamingApplication
   {
     TextInput input = new TextInput();
     Collector collector = new Collector();
-    
+
     // Create stream from the TextInput operator.
     ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(input, input.output, name("input"))
-      
+
         // Extract all the words from the input line of text.
         .flatMap(new Function.FlatMapFunction<String, String>()
         {
@@ -245,18 +245,18 @@ public class WindowedWordCount implements StreamingApplication
             return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
           }
         }, name("ExtractWords"))
-      
+
         // Wrap the word with a randomly generated timestamp.
         .map(new AddTimestampFn(), name("AddTimestampFn"));
-    
-   
+
+
     // apply window and trigger option.
     // TODO: change trigger option to atWaterMark when available.
     WindowedStream<Tuple.TimestampedTuple<String>> windowedWords = stream
         .window(new WindowOption.TimeWindows(Duration.standardMinutes(WINDOW_SIZE)),
         new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1));
-    
-    
+
+
     WindowedStream<PojoEvent> wordCounts =
         // Perform a countByKey transformation to count the appearance of each word in every time window.
         windowedWords.countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>()
@@ -268,10 +268,10 @@ public class WindowedWordCount implements StreamingApplication
               new KeyValPair<String, Long>(input.getValue(), 1L));
           }
         }, name("count words"))
-          
+
         // Format the output and print out the result.
         .map(new FormatAsTableRowFn(), name("FormatAsTableRowFn")).print();
-    
+
     wordCounts.endWith(collector, collector.input, name("Collector")).populateDag(dag);
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
index 29c8cf9..00c40e7 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
@@ -79,12 +79,12 @@ public class AutoComplete implements StreamingApplication
     public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
 
     private transient BufferedReader reader;
-  
+
     public static boolean isDone()
     {
       return done;
     }
-  
+
     @Override
     public void setup(OperatorContext context)
     {
@@ -128,16 +128,16 @@ public class AutoComplete implements StreamingApplication
       }
     }
   }
-  
+
   public static class Collector extends BaseOperator
   {
     private static Map<String, List<CompletionCandidate>> result = new HashMap<>();
-  
+
     public static Map<String, List<CompletionCandidate>> getResult()
     {
       return result;
     }
-  
+
     public final transient DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> input = new DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>()
     {
       @Override
@@ -193,7 +193,7 @@ public class AutoComplete implements StreamingApplication
           @Override
           public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
           {
-            // TODO: Should be removed after Auto-wrapping is supported. 
+            // TODO: Should be removed after Auto-wrapping is supported.
             return new Tuple.WindowedTuple<>(Window.GLOBAL_WINDOW, tuple);
           }
         });
@@ -271,7 +271,8 @@ public class AutoComplete implements StreamingApplication
             {
               return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L));
             }
-          }, name("countByKey")).map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
+          }, name("countByKey"))
+          .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
           {
             @Override
             public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
@@ -300,7 +301,7 @@ public class AutoComplete implements StreamingApplication
 
     ApexStream<String> tags = StreamFactory.fromInput(input, input.output, name("tweetSampler"))
         .flatMap(new ExtractHashtags());
-    
+
     tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
         .addCompositeStreams(ComputeTopCompletions.top(10, true)).endWith(collector, collector.input, name("collector"))
         .populateDag(dag);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
index 8a7113e..5531b5e 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
@@ -45,7 +45,7 @@ public class CompletionCandidate implements Comparable<CompletionCandidate>
   // Empty constructor required for Kryo.
   public CompletionCandidate()
   {
-    
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
index 2a4c003..e7eb90c 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
@@ -24,18 +24,18 @@ package org.apache.apex.malhar.stream.sample.complete;
 public class PojoEvent extends Object
 {
   private String stringValue;
-  
+
   @Override
   public String toString()
   {
     return "PojoEvent [stringValue=" + getStringValue() + "]";
   }
-  
+
   public void setStringValue(String newString)
   {
     this.stringValue = newString;
   }
-  
+
   public String getStringValue()
   {
     return this.stringValue;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
index 2ffdc82..845901a 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
@@ -45,17 +45,17 @@ public class StreamingWordExtract implements StreamingApplication
 {
   private static int wordCount = 0; // A counter to count number of words have been extracted.
   private static int entriesMapped = 0; // A counter to count number of entries have been mapped.
-  
+
   public int getWordCount()
   {
     return wordCount;
   }
-  
+
   public int getEntriesMapped()
   {
     return entriesMapped;
   }
-  
+
   /**
    * A MapFunction that tokenizes lines of text into individual words.
    */
@@ -69,8 +69,8 @@ public class StreamingWordExtract implements StreamingApplication
       return result;
     }
   }
-  
-  
+
+
   /**
    * A MapFunction that uppercases a word.
    */
@@ -82,8 +82,8 @@ public class StreamingWordExtract implements StreamingApplication
       return input.toUpperCase();
     }
   }
-  
-  
+
+
   /**
    * A filter function to filter out empty strings.
    */
@@ -95,14 +95,14 @@ public class StreamingWordExtract implements StreamingApplication
       return !input.isEmpty();
     }
   }
-  
-  
+
+
   /**
    * A map function to map the result string to a pojo entry.
    */
   public static class PojoMapper implements Function.MapFunction<String, Object>
   {
-  
+
     @Override
     public Object f(String input)
     {
@@ -112,7 +112,7 @@ public class StreamingWordExtract implements StreamingApplication
       return pojo;
     }
   }
-  
+
   /**
    * Add field infos to the {@link JdbcPOJOInsertOutputOperator}.
    */
@@ -122,7 +122,7 @@ public class StreamingWordExtract implements StreamingApplication
     fieldInfos.add(new JdbcFieldInfo("STRINGVALUE", "stringValue", JdbcFieldInfo.SupportType.STRING, VARCHAR));
     return fieldInfos;
   }
-  
+
   /**
    * Populate dag with High-Level API.
    * @param dag
@@ -136,25 +136,25 @@ public class StreamingWordExtract implements StreamingApplication
     JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
     jdbcOutput.setStore(outputStore);
     jdbcOutput.setTablename("TestTable");
-    
+
     // Create a stream reading from a folder.
     ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/data");
 
     // Extract all the words from the input line of text.
     stream.flatMap(new ExtractWords())
-      
+
         // Filter out the empty strings.
         .filter(new EmptyStringFilter())
-      
+
         // Change every word to uppercase.
         .map(new Uppercase())
-      
+
         // Map the resulted word to a Pojo entry.
         .map(new PojoMapper())
-      
+
         // Output the entries to JdbcOutput and insert them into a table.
         .endWith(jdbcOutput, jdbcOutput.input, Option.Options.name("jdbcOutput"));
-    
+
     stream.populateDag(dag);
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
index f2e70b1..d7d62fe 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
@@ -63,23 +63,23 @@ public class TopWikipediaSessions implements StreamingApplication
   {
     private String[] names = new String[]{"user1", "user2", "user3", "user4"};
     public transient DefaultOutputPort<KeyValPair<String, Long>> output = new DefaultOutputPort<>();
-  
+
     private static final Duration RAND_RANGE = Duration.standardDays(365);
     private Long minTimestamp;
     private long sleepTime;
     private static int tupleCount = 0;
-  
+
     public static int getTupleCount()
     {
       return tupleCount;
     }
-  
+
     private String randomName(String[] names)
     {
       int index = new Random().nextInt(names.length);
       return names[index];
     }
-  
+
     @Override
     public void setup(Context.OperatorContext context)
     {
@@ -88,7 +88,7 @@ public class TopWikipediaSessions implements StreamingApplication
       minTimestamp = System.currentTimeMillis();
       sleepTime = context.getValue(Context.OperatorContext.SPIN_MILLIS);
     }
-  
+
     @Override
     public void emitTuples()
     {
@@ -103,17 +103,17 @@ public class TopWikipediaSessions implements StreamingApplication
       }
     }
   }
-  
+
   public static class Collector extends BaseOperator
   {
     private final int resultSize = 5;
     private static List<List<TempWrapper>> result = new ArrayList<>();
-  
+
     public static List<List<TempWrapper>> getResult()
     {
       return result;
     }
-  
+
     public final transient DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>>()
     {
       @Override
@@ -126,8 +126,8 @@ public class TopWikipediaSessions implements StreamingApplication
       }
     };
   }
-  
-  
+
+
   /**
    * Convert the upstream (user, time) combination to a timestamped tuple of user.
    */
@@ -138,13 +138,13 @@ public class TopWikipediaSessions implements StreamingApplication
     {
       long timestamp = input.getValue();
       String userName = input.getKey();
-   
+
       // Sets the implicit timestamp field to be used in windowing.
       return new Tuple.TimestampedTuple<>(timestamp, userName);
-      
+
     }
   }
-  
+
   /**
    * Computes the number of edits in each user session.  A session is defined as
    * a string of edits where each is separated from the next by less than an hour.
@@ -156,10 +156,10 @@ public class TopWikipediaSessions implements StreamingApplication
     public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> compose(ApexStream<Tuple.TimestampedTuple<String>> inputStream)
     {
       return inputStream
-        
+
         // Chuck the stream into session windows.
         .window(new WindowOption.SessionWindows(Duration.standardHours(1)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
-        
+
         // Count the number of edits for a user within one session.
         .countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>()
         {
@@ -171,7 +171,7 @@ public class TopWikipediaSessions implements StreamingApplication
         }, name("ComputeSessions"));
     }
   }
-  
+
   /**
    * A comparator class used for comparing two TempWrapper objects.
    */
@@ -183,7 +183,7 @@ public class TopWikipediaSessions implements StreamingApplication
       return Long.compare(o1.getValue().getValue(), o2.getValue().getValue());
     }
   }
-  
+
   /**
    * A function to extract timestamp from a TempWrapper object.
    */
@@ -196,7 +196,7 @@ public class TopWikipediaSessions implements StreamingApplication
       return input.getTimestamp();
     }
   }
-  
+
   /**
    * A temporary wrapper to wrap a KeyValPair and a timestamp together to represent a timestamped tuple, the reason
    * for this is that we cannot resolve a type conflict when calling accumulate(). After the issue resolved, we can
@@ -206,39 +206,39 @@ public class TopWikipediaSessions implements StreamingApplication
   {
     private KeyValPair<String, Long> value;
     private Long timestamp;
-    
+
     public TempWrapper()
     {
-      
+
     }
-    
+
     public TempWrapper(KeyValPair<String, Long> value, Long timestamp)
     {
       this.value = value;
       this.timestamp = timestamp;
     }
-  
+
     @Override
     public String toString()
     {
       return this.value + "  -  " + this.timestamp;
     }
-  
+
     public Long getTimestamp()
     {
       return timestamp;
     }
-  
+
     public void setTimestamp(Long timestamp)
     {
       this.timestamp = timestamp;
     }
-  
+
     public KeyValPair<String, Long> getValue()
     {
       return value;
     }
-  
+
     public void setValue(KeyValPair<String, Long> value)
     {
       this.value = value;
@@ -251,16 +251,16 @@ public class TopWikipediaSessions implements StreamingApplication
   private static class TopPerMonth
       extends CompositeStreamTransform<ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>>
   {
-    
+
     @Override
     public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> inputStream)
     {
       TopN<TempWrapper> topN = new TopN<>();
       topN.setN(10);
       topN.setComparator(new Comp());
-      
+
       return inputStream
-        
+
         // Map the input WindowedTuple to a TempWrapper object.
         .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, TempWrapper>()
         {
@@ -270,15 +270,15 @@ public class TopWikipediaSessions implements StreamingApplication
             return new TempWrapper(input.getValue(), input.getWindows().get(0).getBeginTimestamp());
           }
         }, name("TempWrapper"))
-        
+
         // Apply window and trigger option again, this time chuck the stream into fixed time windows.
         .window(new WindowOption.TimeWindows(Duration.standardDays(30)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(5)))
-        
+
         // Compute the top 10 user-sessions with most number of edits.
         .accumulate(topN, name("TopN")).with("timestampExtractor", new TimestampExtractor());
     }
   }
-  
+
   /**
    * A map function that combine the user and his/her edit session together to a string and use that string as a key
    * with number of edits in that session as value to create a new key value pair to send to downstream.
@@ -293,7 +293,7 @@ public class TopWikipediaSessions implements StreamingApplication
         input.getValue().getValue()));
     }
   }
-  
+
   /**
    * A flapmap function that turns the result into readable format.
    */
@@ -311,7 +311,7 @@ public class TopWikipediaSessions implements StreamingApplication
       return result;
     }
   }
-  
+
   /**
    * A composite transform that compute the top wikipedia sessions.
    */
@@ -327,7 +327,7 @@ public class TopWikipediaSessions implements StreamingApplication
         .addCompositeStreams(new TopPerMonth());
     }
   }
-  
+
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
index 26a2823..3045238 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
@@ -63,7 +63,7 @@ public class TrafficRoutes implements StreamingApplication
   static Map<String, String> sdStations = buildStationInfo();
   static final int WINDOW_DURATION = 3;  // Default sliding window duration in minutes
   static final int WINDOW_SLIDE_EVERY = 1;  // Default window 'slide every' setting in minutes
-  
+
   /**
    * This class holds information about a station reading's average speed.
    */
@@ -75,54 +75,54 @@ public class TrafficRoutes implements StreamingApplication
     Double avgSpeed;
     @Nullable
     Long timestamp;
-    
+
     public StationSpeed() {}
-    
+
     public StationSpeed(String stationId, Double avgSpeed, Long timestamp)
     {
       this.stationId = stationId;
       this.avgSpeed = avgSpeed;
       this.timestamp = timestamp;
     }
-  
+
     public void setAvgSpeed(@Nullable Double avgSpeed)
     {
       this.avgSpeed = avgSpeed;
     }
-  
+
     public void setStationId(@Nullable String stationId)
     {
       this.stationId = stationId;
     }
-  
+
     public void setTimestamp(@Nullable Long timestamp)
     {
       this.timestamp = timestamp;
     }
-  
+
     @Nullable
     public Long getTimestamp()
     {
       return timestamp;
     }
-  
+
     public String getStationId()
     {
       return this.stationId;
     }
-    
+
     public Double getAvgSpeed()
     {
       return this.avgSpeed;
     }
-    
+
     @Override
     public int compareTo(StationSpeed other)
     {
       return Long.compare(this.timestamp, other.timestamp);
     }
   }
-  
+
   /**
    * This class holds information about a route's speed/slowdown.
    */
@@ -134,63 +134,63 @@ public class TrafficRoutes implements StreamingApplication
     Double avgSpeed;
     @Nullable
     Boolean slowdownEvent;
-    
+
     public RouteInfo()
     {
-      
+
     }
-    
+
     public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent)
     {
       this.route = route;
       this.avgSpeed = avgSpeed;
       this.slowdownEvent = slowdownEvent;
     }
-    
+
     public String getRoute()
     {
       return this.route;
     }
-    
+
     public Double getAvgSpeed()
     {
       return this.avgSpeed;
     }
-    
+
     public Boolean getSlowdownEvent()
     {
       return this.slowdownEvent;
     }
   }
-  
+
   /**
    * Extract the timestamp field from the input string, and wrap the input string in a {@link Tuple.TimestampedTuple}
    * with the extracted timestamp.
    */
   static class ExtractTimestamps implements Function.MapFunction<String, Tuple.TimestampedTuple<String>>
   {
-    
+
     @Override
     public Tuple.TimestampedTuple<String> f(String input)
     {
       String[] items = input.split(",");
       String timestamp = tryParseTimestamp(items);
-    
+
       return new Tuple.TimestampedTuple<>(Long.parseLong(timestamp), input);
     }
   }
-  
+
   /**
    * Filter out readings for the stations along predefined 'routes', and output
    * (station, speed info) keyed on route.
    */
   static class ExtractStationSpeedFn implements Function.FlatMapFunction<Tuple.TimestampedTuple<String>, KeyValPair<String, StationSpeed>>
   {
-    
+
     @Override
     public Iterable<KeyValPair<String, StationSpeed>> f(Tuple.TimestampedTuple<String> input)
     {
-      
+
       ArrayList<KeyValPair<String, StationSpeed>> result = new ArrayList<>();
       String[] items = input.getValue().split(",");
       String stationType = tryParseStationType(items);
@@ -210,7 +210,7 @@ public class TrafficRoutes implements StreamingApplication
       return result;
     }
   }
-  
+
   /**
    * For a given route, track average speed for the window. Calculate whether
    * traffic is currently slowing down, via a predefined threshold. If a supermajority of
@@ -261,7 +261,7 @@ public class TrafficRoutes implements StreamingApplication
       return result;
     }
   }
-  
+
   /**
    * Output Pojo class for outputting result to JDBC.
    */
@@ -271,11 +271,11 @@ public class TrafficRoutes implements StreamingApplication
     private Boolean slowdownEvent;
     private String key;
     private Long timestamp;
-    
+
     public OutputPojo()
     {
     }
- 
+
     public OutputPojo(Double avgSpeed, Boolean slowdownEvent, String key, Long timestamp)
     {
       this.avgSpeed = avgSpeed;
@@ -283,64 +283,64 @@ public class TrafficRoutes implements StreamingApplication
       this.key = key;
       this.timestamp = timestamp;
     }
-  
+
     @Override
     public String toString()
     {
       return key + " + " + avgSpeed + " + " + slowdownEvent + " + " + timestamp;
     }
-  
+
     public void setTimestamp(Long timestamp)
     {
       this.timestamp = timestamp;
     }
-  
+
     public Long getTimestamp()
     {
       return timestamp;
     }
-  
+
     public void setAvgSpeed(Double avgSpeed)
     {
       this.avgSpeed = avgSpeed;
     }
-  
+
     public Double getAvgSpeed()
     {
       return avgSpeed;
     }
-  
+
     public void setKey(String key)
     {
       this.key = key;
     }
-  
+
     public String getKey()
     {
       return key;
     }
-  
+
     public void setSlowdownEvent(Boolean slowdownEvent)
     {
       this.slowdownEvent = slowdownEvent;
     }
-  
+
     public Boolean getSlowdownEvent()
     {
       return slowdownEvent;
     }
-    
+
   }
-  
+
   public static class Collector extends BaseOperator
   {
     private static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> result = new HashMap<>();
-  
+
     public static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> getResult()
     {
       return result;
     }
-    
+
     public final transient DefaultInputPort<OutputPojo> input = new DefaultInputPort<OutputPojo>()
     {
       @Override
@@ -350,7 +350,7 @@ public class TrafficRoutes implements StreamingApplication
       }
     };
   }
-  
+
   /**
    * Format the results of the slowdown calculations to a OutputPojo.
    */
@@ -364,8 +364,8 @@ public class TrafficRoutes implements StreamingApplication
       return row;
     }
   }
-    
-  
+
+
   /**
    * This composite transformation extracts speed info from traffic station readings.
    * It groups the readings by 'route' and analyzes traffic slowdown for that route.
@@ -389,19 +389,19 @@ public class TrafficRoutes implements StreamingApplication
               return new Tuple.TimestampedTuple<>(input.getValue().getTimestamp(), input);
             }
           }, name("GroupByKey"));
-      
+
       // Analyze 'slowdown' over the route readings.
       WindowedStream<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> stats = timeGroup
           .flatMap(new GatherStats(), name("GatherStats"));
-      
+
       // Format the results for writing to JDBC table.
       WindowedStream<OutputPojo> results = stats.map(new FormatStatsFn(), name("FormatStatsFn"));
-      
+
       return results;
     }
   }
-  
-  
+
+
   private static Double tryParseAvgSpeed(String[] inputItems)
   {
     try {
@@ -412,27 +412,27 @@ public class TrafficRoutes implements StreamingApplication
       return null;
     }
   }
-  
+
   private static String tryParseStationType(String[] inputItems)
   {
     return tryParseString(inputItems, 2);
   }
-  
+
   private static String tryParseStationId(String[] inputItems)
   {
     return tryParseString(inputItems, 1);
   }
-  
+
   private static String tryParseTimestamp(String[] inputItems)
   {
     return tryParseString(inputItems, 0);
   }
-  
+
   private static String tryParseString(String[] inputItems, int index)
   {
     return inputItems.length >= index ? inputItems[index] : null;
   }
-  
+
   /**
    * Define some small hard-wired San Diego 'routes' to track based on sensor station ID.
    */
@@ -444,33 +444,33 @@ public class TrafficRoutes implements StreamingApplication
     stations.put("1108702", "SDRoute2");
     return stations;
   }
-  
+
   /**
    * A dummy generator to generate some traffic information.
    */
   public static class InfoGen extends BaseOperator implements InputOperator
   {
     public transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
-    
+
     private String[] stationTypes = new String[]{"ML", "BL", "GL"};
     private int[] stationIDs = new int[]{1108413, 1108699, 1108702};
     private double ave = 55.0;
     private long timestamp;
     private static final Duration RAND_RANGE = Duration.standardMinutes(10);
     private static int tupleCount = 0;
-  
+
     public static int getTupleCount()
     {
       return tupleCount;
     }
-  
+
     @Override
     public void setup(Context.OperatorContext context)
     {
       tupleCount = 0;
       timestamp = System.currentTimeMillis();
     }
-  
+
     @Override
     public void emitTuples()
     {
@@ -481,7 +481,7 @@ public class TrafficRoutes implements StreamingApplication
           try {
             output.emit(time + "," + stationID + "," + stationType + "," + speed);
             tupleCount++;
-         
+
             Thread.sleep(50);
           } catch (Exception e) {
             // Ignore it
@@ -490,29 +490,29 @@ public class TrafficRoutes implements StreamingApplication
       }
     }
   }
-  
+
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
     InfoGen infoGen = new InfoGen();
     Collector collector = new Collector();
-    
+
     // Create a stream from the input operator.
     ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(infoGen, infoGen.output, name("infoGen"))
-        
+
         // Extract the timestamp from the input and wrap it into a TimestampedTuple.
         .map(new ExtractTimestamps(), name("ExtractTimestamps"));
-    
+
     stream
         // Extract the average speed of a station.
         .flatMap(new ExtractStationSpeedFn(), name("ExtractStationSpeedFn"))
-      
+
         // Apply window and trigger option.
         .window(new WindowOption.SlidingTimeWindows(Duration.standardMinutes(WINDOW_DURATION), Duration.standardMinutes(WINDOW_SLIDE_EVERY)), new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(5000)).accumulatingFiredPanes())
-        
+
         // Apply TrackSpeed composite transformation to compute the route information.
         .addCompositeStreams(new TrackSpeed())
-      
+
         // print the result to console.
         .print()
         .endWith(collector, collector.input, name("Collector"))

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
index ecad622..a4fdf24 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
@@ -200,7 +200,8 @@ public class TwitterAutoComplete implements StreamingApplication
             {
               return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L));
             }
-          }, name("Hashtag Count")).map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
+          }, name("Hashtag Count"))
+          .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
           {
             @Override
             public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
index d88a8dc..7c16521 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
@@ -94,7 +94,7 @@ public class CombinePerKeyExamples implements StreamingApplication
       return new SampleBean(input.getValue().getKey(), input.getValue().getValue());
     }
   }
-  
+
   /**
    * A reduce function to concat two strings together.
    */
@@ -106,7 +106,7 @@ public class CombinePerKeyExamples implements StreamingApplication
       return input1 + ", " + input2;
     }
   }
-  
+
   /**
    * Reads the public 'Shakespeare' data, and for each word in the dataset
    * over a given length, generates a string containing the list of play names
@@ -114,17 +114,17 @@ public class CombinePerKeyExamples implements StreamingApplication
    */
   private static class PlaysForWord extends CompositeStreamTransform<ApexStream<SampleBean>, WindowedStream<SampleBean>>
   {
-    
+
     @Override
     public WindowedStream<SampleBean> compose(ApexStream<SampleBean> inputStream)
     {
       return inputStream
           // Extract words from the input SampleBeam stream.
           .map(new ExtractLargeWordsFn(), name("ExtractLargeWordsFn"))
-          
+
           // Apply window and trigger option to the streams.
           .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
-        
+
           // Apply reduceByKey transformation to concat the names of all the plays that a word has appeared in together.
           .reduceByKey(new Concat(), new Function.ToKeyValue<KeyValPair<String,String>, String, String>()
           {
@@ -134,13 +134,13 @@ public class CombinePerKeyExamples implements StreamingApplication
               return new Tuple.PlainTuple<KeyValPair<String, String>>(input);
             }
           }, name("Concat"))
-        
+
           // Format the output back to a SampleBeam object.
           .map(new FormatShakespeareOutputFn(), name("FormatShakespeareOutputFn"));
     }
   }
-  
-  
+
+
   /**
    * A Java Beam class that contains information about a word appears in a corpus written by Shakespeare.
    */
@@ -157,13 +157,13 @@ public class CombinePerKeyExamples implements StreamingApplication
       this.word = word;
       this.corpus = corpus;
     }
-  
+
     @Override
     public String toString()
     {
       return this.word + " : "  + this.corpus;
     }
-  
+
     private String word;
 
     private String corpus;
@@ -188,7 +188,7 @@ public class CombinePerKeyExamples implements StreamingApplication
       return corpus;
     }
   }
-  
+
   /**
    * A dummy info generator to generate {@link SampleBean} objects to mimic reading from real 'Shakespeare'
    * data.
@@ -200,19 +200,19 @@ public class CombinePerKeyExamples implements StreamingApplication
     private String[] words = new String[]{"A", "B", "C", "D", "E", "F", "G"};
     private String[] corpuses = new String[]{"1", "2", "3", "4", "5", "6", "7", "8"};
     private static int i;
-  
+
     public static int getI()
     {
       return i;
     }
-  
+
     @Override
     public void setup(Context.OperatorContext context)
     {
       super.setup(context);
       i = 0;
     }
-  
+
     @Override
     public void emitTuples()
     {
@@ -229,20 +229,20 @@ public class CombinePerKeyExamples implements StreamingApplication
         }
         i++;
       }
-    
+
     }
   }
-  
+
   public static class Collector extends BaseOperator
   {
     static List<SampleBean> result;
-  
+
     @Override
     public void setup(Context.OperatorContext context)
     {
       result = new ArrayList<>();
     }
-  
+
     public final transient DefaultInputPort<SampleBean> input = new DefaultInputPort<SampleBean>()
     {
       @Override
@@ -252,7 +252,7 @@ public class CombinePerKeyExamples implements StreamingApplication
       }
     };
   }
-  
+
   /**
    * Populate dag using High-Level API.
    * @param dag
@@ -268,6 +268,6 @@ public class CombinePerKeyExamples implements StreamingApplication
       .print()
       .endWith(collector, collector.input, name("Collector"))
       .populateDag(dag);
-    
+
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
index 2930010..0cd7c58 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
@@ -47,22 +47,22 @@ import static org.apache.apex.malhar.stream.api.Option.Options.name;
 @ApplicationAnnotation(name = "DeDupExample")
 public class DeDupExample implements StreamingApplication
 {
-  
+
   public static class Collector extends BaseOperator
   {
     private static Tuple.WindowedTuple<List<String>> result;
     private static boolean done = false;
-  
+
     public static Tuple.WindowedTuple<List<String>> getResult()
     {
       return result;
     }
-  
+
     public static boolean isDone()
     {
       return done;
     }
-  
+
     @Override
     public void setup(Context.OperatorContext context)
     {
@@ -70,7 +70,7 @@ public class DeDupExample implements StreamingApplication
       result = new Tuple.WindowedTuple<>();
       done = false;
     }
-  
+
     public transient DefaultInputPort<Tuple.WindowedTuple<List<String>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<String>>>()
     {
       @Override
@@ -83,15 +83,15 @@ public class DeDupExample implements StreamingApplication
       }
     };
   }
-    
+
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
     Collector collector = new Collector();
-    
+
     // Create a stream that reads from files in a local folder and output lines one by one to downstream.
     ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput"))
-      
+
         // Extract all the words from the input line of text.
         .flatMap(new Function.FlatMapFunction<String, String>()
         {
@@ -101,7 +101,7 @@ public class DeDupExample implements StreamingApplication
             return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
           }
         }, name("ExtractWords"))
-      
+
         // Change the words to lower case, also shutdown the app when the word "bye" is detected.
         .map(new Function.MapFunction<String, String>()
         {
@@ -111,14 +111,14 @@ public class DeDupExample implements StreamingApplication
             return input.toLowerCase();
           }
         }, name("ToLowerCase"));
-    
+
     // Apply window and trigger option.
     stream.window(new WindowOption.GlobalWindow(),
         new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(1)))
-        
+
         // Remove the duplicate words and print out the result.
         .accumulate(new RemoveDuplicates<String>(), name("RemoveDuplicates")).print().endWith(collector, collector.input)
-    
+
         .populateDag(dag);
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
index 3643eab..1ba2a90 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
@@ -27,48 +27,48 @@ public class InputPojo extends Object
   private int day;
   private int year;
   private double meanTemp;
-  
+
   @Override
   public String toString()
   {
     return "PojoEvent [month=" + getMonth() + ", day=" + getDay() + ", year=" + getYear() + ", meanTemp=" + getMeanTemp() + "]";
   }
-  
+
   public void setMonth(int month)
   {
     this.month = month;
   }
-  
+
   public int getMonth()
   {
     return this.month;
   }
-  
+
   public void setDay(int day)
   {
     this.day = day;
   }
-  
+
   public int getDay()
   {
     return day;
   }
-  
+
   public void setYear(int year)
   {
     this.year = year;
   }
-  
+
   public int getYear()
   {
     return year;
   }
-  
+
   public void setMeanTemp(double meanTemp)
   {
     this.meanTemp = meanTemp;
   }
-  
+
   public double getMeanTemp()
   {
     return meanTemp;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
index 02980e4..4538aef 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
@@ -55,7 +55,7 @@ import static org.apache.apex.malhar.stream.api.Option.Options.name;
 @ApplicationAnnotation(name = "MaxPerKeyExamples")
 public class MaxPerKeyExamples implements StreamingApplication
 {
-  
+
   /**
    *  A map function to extract the mean temperature from {@link InputPojo}.
    */
@@ -69,8 +69,8 @@ public class MaxPerKeyExamples implements StreamingApplication
       return new KeyValPair<Integer, Double>(month, meanTemp);
     }
   }
-  
-  
+
+
   /**
    * A map function to format output to {@link OutputPojo}.
    */
@@ -85,7 +85,7 @@ public class MaxPerKeyExamples implements StreamingApplication
       return row;
     }
   }
-  
+
   /**
    * A composite transformation to perform three tasks:
    * 1. extract the month and its mean temperature from input pojo.
@@ -99,7 +99,7 @@ public class MaxPerKeyExamples implements StreamingApplication
     {
       // InputPojo... => <month, meanTemp> ...
       WindowedStream<KeyValPair<Integer, Double>> temps = rows.map(new ExtractTempFn(), name("ExtractTempFn"));
-      
+
       // month, meanTemp... => <month, max mean temp>...
       WindowedStream<Tuple.WindowedTuple<KeyValPair<Integer, Double>>> tempMaxes =
           temps.accumulateByKey(new Max<Double>(),
@@ -111,14 +111,14 @@ public class MaxPerKeyExamples implements StreamingApplication
                 return new Tuple.WindowedTuple<KeyValPair<Integer, Double>>(Window.GLOBAL_WINDOW, input);
               }
             }, name("MaxPerMonth"));
-      
+
       // <month, max>... => OutputPojo...
       WindowedStream<OutputPojo> results = tempMaxes.map(new FormatMaxesFn(), name("FormatMaxesFn"));
-      
+
       return results;
     }
   }
-  
+
   /**
    * Method to set field info for {@link JdbcPOJOInputOperator}.
    * @return
@@ -132,7 +132,7 @@ public class MaxPerKeyExamples implements StreamingApplication
     fieldInfos.add(new FieldInfo("MEANTEMP", "meanTemp", FieldInfo.SupportType.DOUBLE));
     return fieldInfos;
   }
-  
+
   /**
    * Method to set field info for {@link JdbcPOJOInsertOutputOperator}.
    * @return
@@ -144,8 +144,8 @@ public class MaxPerKeyExamples implements StreamingApplication
     fieldInfos.add(new JdbcFieldInfo("MEANTEMP", "meanTemp", JdbcFieldInfo.SupportType.DOUBLE, DOUBLE));
     return fieldInfos;
   }
-  
-  
+
+
   /**
    * Populate the dag using High-Level API.
    * @param dag
@@ -156,21 +156,21 @@ public class MaxPerKeyExamples implements StreamingApplication
   {
     JdbcPOJOInputOperator jdbcInput = new JdbcPOJOInputOperator();
     jdbcInput.setFieldInfos(addInputFieldInfos());
-  
+
     JdbcStore store = new JdbcStore();
     jdbcInput.setStore(store);
-  
+
     JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator();
     jdbcOutput.setFieldInfos(addOutputFieldInfos());
     JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
     jdbcOutput.setStore(outputStore);
-    
+
     // Create stream that reads from a Jdbc Input.
     ApexStream<Object> stream = StreamFactory.fromInput(jdbcInput, jdbcInput.outputPort, name("jdbcInput"))
-      
+
         // Apply window and trigger option to the stream.
         .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
-      
+
         // Because Jdbc Input sends out a stream of Object, need to cast them to InputPojo.
         .map(new Function.MapFunction<Object, InputPojo>()
         {
@@ -180,10 +180,10 @@ public class MaxPerKeyExamples implements StreamingApplication
             return (InputPojo)input;
           }
         }, name("ObjectToInputPojo"))
-      
+
         // Plug in the composite transformation to the stream to calculate the maximum temperature for each month.
         .addCompositeStreams(new MaxMeanTemp())
-      
+
         // Cast the resulted OutputPojo to Object for Jdbc Output to consume.
         .map(new Function.MapFunction<OutputPojo, Object>()
         {
@@ -193,11 +193,11 @@ public class MaxPerKeyExamples implements StreamingApplication
             return (Object)input;
           }
         }, name("OutputPojoToObject"))
-      
+
         // Output the result to Jdbc Output.
         .endWith(jdbcOutput, jdbcOutput.input, name("jdbcOutput"));
-    
+
     stream.populateDag(dag);
-  
+
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
index db2a09e..59831b7 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
@@ -25,28 +25,28 @@ public class OutputPojo
 {
   private int month;
   private double meanTemp;
-  
+
   @Override
   public String toString()
   {
     return "PojoEvent [month=" + getMonth() + ", meanTemp=" + getMeanTemp() + "]";
   }
-  
+
   public void setMonth(int month)
   {
     this.month = month;
   }
-  
+
   public int getMonth()
   {
     return this.month;
   }
-  
+
   public void setMeanTemp(double meanTemp)
   {
     this.meanTemp = meanTemp;
   }
-  
+
   public double getMeanTemp()
   {
     return meanTemp;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
index bf23e3a..dd09352 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
@@ -189,7 +189,7 @@ public class TriggerExample
       // At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a
       // result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered
       // late, and dropped.
-  
+
       WindowedStream<SampleBean> defaultTriggerResults = inputStream
           .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
           new TriggerOption().discardingFiredPanes())
@@ -306,7 +306,7 @@ public class TriggerExample
     @Override
     public WindowedStream<SampleBean> compose(WindowedStream<String> inputStream)
     {
-  
+
       WindowedStream<KeyValPair<String, Iterable<Integer>>> flowPerFreeway = inputStream
           .groupByKey(new ExtractFlowInfo());
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
index 101953f..d32da72 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
@@ -37,11 +37,11 @@ public class MinimalWordCountTest
   {
     LocalMode lma = LocalMode.newInstance();
     Configuration conf = new Configuration(false);
-  
+
     MinimalWordCount app = new MinimalWordCount();
 
     lma.prepareDAG(app, conf);
- 
+
     LocalMode.Controller lc = lma.getController();
     ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
     {
@@ -51,9 +51,9 @@ public class MinimalWordCountTest
         return MinimalWordCount.Collector.isDone();
       }
     });
-    
+
     lc.run(10000);
-  
+
     Assert.assertTrue(MinimalWordCount.Collector.result.get("error") == 7);
     Assert.assertTrue(MinimalWordCount.Collector.result.get("word") == 119);
     Assert.assertTrue(MinimalWordCount.Collector.result.get("bye") == 1);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
index 952356f..f6270d4 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
@@ -56,16 +56,16 @@ public class WindowedWordCountTest
         return WindowedWordCount.TextInput.isDone();
       }
     });
-    
+
     lc.run(60000);
-    
+
     Assert.assertEquals(127, countSum(WindowedWordCount.Collector.getResult()));
     Assert.assertEquals(28, countSumWord(WindowedWordCount.Collector.getResult(), "word2"));
     Assert.assertEquals(7, countSumWord(WindowedWordCount.Collector.getResult(), "error"));
     Assert.assertEquals(21, countSumWord(WindowedWordCount.Collector.getResult(), "word9"));
     Assert.assertEquals(1, countSumWord(WindowedWordCount.Collector.getResult(), "bye"));
   }
-  
+
   public long countSum(Map<KeyValPair<Long, String>, Long> map)
   {
     long sum = 0;
@@ -74,7 +74,7 @@ public class WindowedWordCountTest
     }
     return sum;
   }
-  
+
   public long countSumWord(Map<KeyValPair<Long, String>, Long> map, String word)
   {
     long sum = 0;
@@ -85,6 +85,6 @@ public class WindowedWordCountTest
     }
     return sum;
   }
-  
+
 }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
index dc236f9..26bb13e 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
@@ -41,7 +41,7 @@ public class AutoCompleteTest
     Configuration conf = new Configuration(false);
     lma.prepareDAG(new AutoComplete(), conf);
     LocalMode.Controller lc = lma.getController();
-    
+
     ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
     {
       @Override
@@ -50,9 +50,9 @@ public class AutoCompleteTest
         return AutoComplete.TweetsInput.isDone();
       }
     });
-    
+
     lc.run(200000);
-  
+
     Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("chi"));
     Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("china"));
     Assert.assertEquals(2, AutoComplete.Collector.getResult().get("china").get(0).getCount());
@@ -61,6 +61,6 @@ public class AutoCompleteTest
     Assert.assertEquals(3, AutoComplete.Collector.getResult().get("f").size());
     Assert.assertTrue(AutoComplete.Collector.getResult().get("f").get(0).getCount() >= AutoComplete.Collector.getResult().get("f").get(1).getCount());
     Assert.assertTrue(AutoComplete.Collector.getResult().get("f").get(1).getCount() >= AutoComplete.Collector.getResult().get("f").get(2).getCount());
-  
+
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
index bf9b030..dc9cdec 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
@@ -54,10 +54,10 @@ public class StreamingWordExtractTest
   {
     try {
       Class.forName(DB_DRIVER).newInstance();
-      
+
       Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
       Statement stmt = con.createStatement();
-      
+
       String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
           + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
           + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
@@ -66,16 +66,16 @@ public class StreamingWordExtractTest
           + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
           + ")";
       stmt.executeUpdate(createMetaTable);
-      
+
       String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
           + "(STRINGVALUE VARCHAR(255))";
       stmt.executeUpdate(createTable);
-      
+
     } catch (Throwable e) {
       throw Throwables.propagate(e);
     }
   }
-  
+
   @After
   public void cleanTable()
   {
@@ -88,7 +88,7 @@ public class StreamingWordExtractTest
       throw new RuntimeException(e);
     }
   }
-  
+
   public void setConfig(Configuration conf)
   {
     conf.set("dt.operator.jdbcOutput.prop.store.userName", USER_NAME);
@@ -99,14 +99,14 @@ public class StreamingWordExtractTest
     conf.set("dt.operator.jdbcOutput.prop.store.databaseUrl", DB_URL);
     conf.set("dt.operator.jdbcOutput.prop.tablename", TABLE_NAME);
   }
-  
+
   public int getNumOfEventsInStore()
   {
     Connection con;
     try {
       con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
       Statement stmt = con.createStatement();
-      
+
       String countQuery = "SELECT count(*) from " + TABLE_NAME;
       ResultSet resultSet = stmt.executeQuery(countQuery);
       resultSet.next();
@@ -115,7 +115,7 @@ public class StreamingWordExtractTest
       throw new RuntimeException("fetching count", e);
     }
   }
-  
+
   @Test
   public void StreamingWordExtractTest() throws Exception
   {
@@ -125,7 +125,7 @@ public class StreamingWordExtractTest
     StreamingWordExtract app = new StreamingWordExtract();
     lma.prepareDAG(app, conf);
     LocalMode.Controller lc = lma.getController();
-    
+
     ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
     {
       @Override
@@ -134,11 +134,11 @@ public class StreamingWordExtractTest
         return getNumOfEventsInStore() == 36;
       }
     });
-    
+
     lc.run(10000);
-  
+
     Assert.assertEquals(app.getWordCount(), getNumOfEventsInStore());
     Assert.assertEquals(app.getEntriesMapped(), getNumOfEventsInStore());
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
index f8ec086..c0dbaf4 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
@@ -41,7 +41,7 @@ public class TopWikipediaSessionsTest
     Configuration conf = new Configuration(false);
     lma.prepareDAG(new TopWikipediaSessions(), conf);
     LocalMode.Controller lc = lma.getController();
-    
+
     ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
     {
       @Override
@@ -50,14 +50,14 @@ public class TopWikipediaSessionsTest
         return TopWikipediaSessions.SessionGen.getTupleCount() >= 250;
       }
     });
-    
+
     lc.run(30000);
-    
+
     for (int i = 0; i < TopWikipediaSessions.Collector.getResult().size(); i++) {
       Assert.assertTrue(isInOrder(TopWikipediaSessions.Collector.getResult().get(i)));
     }
   }
-  
+
   public boolean isInOrder(List<TopWikipediaSessions.TempWrapper> input)
   {
     if (input.size() == 0 || input.size() == 1) {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
index e363ca7..c532898 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
@@ -43,7 +43,7 @@ public class TrafficRoutesTest
     Configuration conf = new Configuration(false);
     lma.prepareDAG(new TrafficRoutes(), conf);
     LocalMode.Controller lc = lma.getController();
-    
+
     ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
     {
       @Override
@@ -52,9 +52,9 @@ public class TrafficRoutesTest
         return TrafficRoutes.InfoGen.getTupleCount() >= 100;
       }
     });
-    
+
     lc.run(60000);
-    
+
     Assert.assertTrue(!TrafficRoutes.Collector.getResult().isEmpty());
     for (Map.Entry<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> entry : TrafficRoutes.Collector.getResult().entrySet()) {
       Assert.assertTrue(entry.getValue().getKey() <= 75);
@@ -62,5 +62,5 @@ public class TrafficRoutesTest
       Assert.assertTrue(entry.getKey().getValue().equals("SDRoute1") || entry.getKey().getValue().equals("SDRoute2"));
     }
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
index 5858013..b130808 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
@@ -35,11 +35,11 @@ public class CombinePerKeyExamplesTest
   {
     LocalMode lma = LocalMode.newInstance();
     Configuration conf = new Configuration(false);
-  
+
     CombinePerKeyExamples app = new CombinePerKeyExamples();
-      
+
     lma.prepareDAG(app, conf);
-    
+
     LocalMode.Controller lc = lma.getController();
     ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
     {
@@ -50,7 +50,7 @@ public class CombinePerKeyExamplesTest
       }
     });
     lc.run(100000);
-  
+
     Assert.assertTrue(CombinePerKeyExamples.Collector.result.get(CombinePerKeyExamples.Collector.result.size() - 1).getCorpus().contains("1, 2, 3, 4, 5, 6, 7, 8"));
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
index ed4ddb4..a175cd7 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
@@ -38,7 +38,7 @@ public class DeDupExampleTest
   {
     LocalMode lma = LocalMode.newInstance();
     Configuration conf = new Configuration(false);
-    
+
     DeDupExample app = new DeDupExample();
     lma.prepareDAG(app, conf);
     LocalMode.Controller lc = lma.getController();
@@ -51,9 +51,9 @@ public class DeDupExampleTest
       }
     });
     lc.run(50000);
-  
+
     Assert.assertEquals(9, DeDupExample.Collector.getResult().getValue().size());
-    
+
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java
index 51981de..ec28b40 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java
@@ -46,7 +46,7 @@ import com.datatorrent.stram.StramLocalCluster;
  */
 public class MaxPerKeyExamplesTest
 {
-  
+
   private static final String INPUT_TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.cookbook.InputPojo";
   private static final String OUTPUT_TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.cookbook.OutputPojo";
   private static final String DB_DRIVER = "org.h2.Driver";
@@ -56,18 +56,18 @@ public class MaxPerKeyExamplesTest
   private static final String USER_NAME = "root";
   private static final String PSW = "password";
   private static final String QUERY = "SELECT * FROM " + INPUT_TABLE + ";";
-  
+
   private static final double[] MEANTEMPS = {85.3, 75.4};
-  
+
   @BeforeClass
   public static void setup()
   {
     try {
       Class.forName(DB_DRIVER).newInstance();
-      
+
       Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
       Statement stmt = con.createStatement();
-      
+
       String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
           + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
           + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
@@ -76,53 +76,53 @@ public class MaxPerKeyExamplesTest
           + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
           + ")";
       stmt.executeUpdate(createMetaTable);
-      
+
       String createInputTable = "CREATE TABLE IF NOT EXISTS " + INPUT_TABLE
           + "(MONTH INT(2) not NULL, DAY INT(2), YEAR INT(4), MEANTEMP DOUBLE(10) )";
       stmt.executeUpdate(createInputTable);
-  
+
       String createOutputTable = "CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE
           + "(MONTH INT(2) not NULL, MEANTEMP DOUBLE(10) )";
       stmt.executeUpdate(createOutputTable);
-      
+
       String cleanTable = "truncate table " + INPUT_TABLE;
       stmt.executeUpdate(cleanTable);
-  
+
       stmt = con.createStatement();
-  
+
       String sql = "INSERT INTO " + INPUT_TABLE + " VALUES (6, 21, 2014, 85.3)";
       stmt.executeUpdate(sql);
       sql = "INSERT INTO " + INPUT_TABLE + " VALUES (7, 20, 2014, 75.4)";
       stmt.executeUpdate(sql);
       sql = "INSERT INTO " + INPUT_TABLE + " VALUES (6, 18, 2014, 45.3)";
       stmt.executeUpdate(sql);
-      
+
     } catch (Throwable e) {
       throw Throwables.propagate(e);
     }
   }
-  
+
   @AfterClass
   public static void cleanup()
   {
     try {
       Class.forName(DB_DRIVER).newInstance();
-  
+
       Connection con = DriverManager.getConnection(DB_URL, USER_NAME, PSW);
       Statement stmt = con.createStatement();
-  
+
       String dropInputTable = "DROP TABLE " + INPUT_TABLE;
       stmt.executeUpdate(dropInputTable);
-  
+
       String dropOutputTable = "DROP TABLE " + OUTPUT_TABLE;
       stmt.executeUpdate(dropOutputTable);
-      
+
     } catch (Throwable e) {
       throw Throwables.propagate(e);
     }
-    
+
   }
-  
+
   public void setConfig(Configuration conf)
   {
     conf.set("dt.operator.jdbcInput.prop.store.userName", USER_NAME);
@@ -133,7 +133,7 @@ public class MaxPerKeyExamplesTest
     conf.set("dt.operator.jdbcInput.prop.store.databaseUrl", DB_URL);
     conf.set("dt.operator.jdbcInput.prop.tableName", INPUT_TABLE);
     conf.set("dt.operator.jdbcInput.prop.query", QUERY);
-  
+
     conf.set("dt.operator.jdbcOutput.prop.store.userName", USER_NAME);
     conf.set("dt.operator.jdbcOutput.prop.store.password", PSW);
     conf.set("dt.operator.jdbcOutput.prop.store.databaseDriver", DB_DRIVER);
@@ -142,14 +142,14 @@ public class MaxPerKeyExamplesTest
     conf.set("dt.operator.jdbcOutput.prop.store.databaseUrl", DB_URL);
     conf.set("dt.operator.jdbcOutput.prop.tablename", OUTPUT_TABLE);
   }
-  
+
   public int getNumEntries()
   {
     Connection con;
     try {
       con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
       Statement stmt = con.createStatement();
-    
+
       String countQuery = "SELECT count(DISTINCT (MONTH, MEANTEMP)) from " + OUTPUT_TABLE;
       ResultSet resultSet = stmt.executeQuery(countQuery);
       resultSet.next();
@@ -158,7 +158,7 @@ public class MaxPerKeyExamplesTest
       throw new RuntimeException("fetching count", e);
     }
   }
-  
+
   public Map<Integer, Double> getMaxMeanTemp()
   {
     Map<Integer, Double> result = new HashMap<>();
@@ -166,30 +166,30 @@ public class MaxPerKeyExamplesTest
     try {
       con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
       Statement stmt = con.createStatement();
-    
+
       String countQuery = "SELECT DISTINCT * from " + OUTPUT_TABLE;
       ResultSet resultSet = stmt.executeQuery(countQuery);
       while (resultSet.next()) {
         result.put(resultSet.getInt("MONTH"), resultSet.getDouble("MEANTEMP"));
-        
+
       }
       return result;
     } catch (SQLException e) {
       throw new RuntimeException("fetching count", e);
     }
   }
-  
+
   @Test
   public void MaxPerKeyExampleTest() throws Exception
   {
     LocalMode lma = LocalMode.newInstance();
     Configuration conf = new Configuration(false);
     setConfig(conf);
-    
+
     MaxPerKeyExamples app = new MaxPerKeyExamples();
-  
+
     lma.prepareDAG(app, conf);
-  
+
     LocalMode.Controller lc = lma.getController();
     ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
     {
@@ -199,9 +199,9 @@ public class MaxPerKeyExamplesTest
         return getNumEntries() == 2;
       }
     });
-    
+
     lc.run(5000);
-    
+
     double[] result = new double[2];
     result[0] = getMaxMeanTemp().get(6);
     result[1] = getMaxMeanTemp().get(7);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java
----------------------------------------------------------------------
diff --git a/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java b/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java
index 5f93206..7fbdfd1 100644
--- a/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java
+++ b/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java
@@ -18,23 +18,24 @@
  */
 package com.datatorrent.demos.iteration;
 
-import com.datatorrent.api.Context;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.conf.Configuration;
 
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.common.util.DefaultDelayOperator;
 import com.datatorrent.lib.testbench.RandomEventGenerator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
 
 /**
  * Iteration demo : <br>
@@ -64,10 +65,10 @@ import java.io.PrintStream;
  *
  * @since 3.4.0
  */
-@ApplicationAnnotation(name="IterationDemo")
+@ApplicationAnnotation(name = "IterationDemo")
 public class Application implements StreamingApplication
 {
-  private final static Logger LOG = LoggerFactory.getLogger(Application.class);
+  private static final Logger LOG = LoggerFactory.getLogger(Application.class);
   private String extraOutputFileName; // for unit test
 
   public static class FibonacciOperator extends BaseOperator
@@ -117,7 +118,7 @@ public class Application implements StreamingApplication
       public void process(Object t)
       {
         String s = t.toString();
-        System.out.println(s);
+        LOG.info(s);
         if (extraOutputStream != null) {
           extraOutputStream.println(s);
         }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java b/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java
index 7804fcd..9fb89ac 100644
--- a/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java
+++ b/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java
@@ -18,17 +18,16 @@
  */
 package com.datatorrent.demos.iteration;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
 
-import org.apache.hadoop.conf.Configuration;
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.datatorrent.api.LocalMode;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
+import org.apache.hadoop.conf.Configuration;
 
+import com.datatorrent.api.LocalMode;
 
 /**
  *
@@ -61,7 +60,8 @@ public class ApplicationTest
       if (file.length() > 50) {
         break;
       }
-    } while (System.currentTimeMillis() - startTime < timeout);
+    }
+    while (System.currentTimeMillis() - startTime < timeout);
 
     lc.shutdown();
     try (BufferedReader br = new BufferedReader(new FileReader(outputFileName))) {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java
index 32aac35..55b299f 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java
@@ -18,22 +18,23 @@
  */
 package com.datatorrent.demos.machinedata;
 
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 
+import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator;
 import com.datatorrent.demos.machinedata.data.MachineKey;
 import com.datatorrent.demos.machinedata.operator.MachineInfoAveragingOperator;
 import com.datatorrent.demos.machinedata.operator.MachineInfoAveragingPrerequisitesOperator;
-import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator;
 import com.datatorrent.lib.io.SmtpOutputOperator;
 
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * <p>
  * Resource monitor application.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java
index 77b39b5..75c2a02 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java
@@ -18,18 +18,13 @@
  */
 package com.datatorrent.demos.machinedata;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.demos.machinedata.data.MachineInfo;
 import com.datatorrent.demos.machinedata.data.MachineKey;
 
-import java.util.*;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * <p>
  * Information tuple generator with randomness.
@@ -42,9 +37,10 @@ public class DimensionGenerator extends BaseOperator
 {
   public transient DefaultOutputPort<MachineInfo> outputInline = new DefaultOutputPort<>();
   public transient DefaultOutputPort<MachineInfo> output = new DefaultOutputPort<>();
-  private int threshold=90;
+  private int threshold = 90;
 
-  public final transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>() {
+  public final transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>()
+  {
 
     @Override
     public void process(MachineInfo tuple)
@@ -113,9 +109,9 @@ public class DimensionGenerator extends BaseOperator
       int hdd = tuple.getHdd();
       MachineInfo machineInfo = new MachineInfo();
       machineInfo.setMachineKey(machineKey);
-      machineInfo.setCpu((cpu < threshold)?cpu:threshold);
-      machineInfo.setRam((ram < threshold)?ram:threshold);
-      machineInfo.setHdd((hdd < threshold)?hdd:threshold);
+      machineInfo.setCpu((cpu < threshold) ? cpu : threshold);
+      machineInfo.setRam((ram < threshold) ? ram : threshold);
+      machineInfo.setHdd((hdd < threshold) ? hdd : threshold);
       outputInline.emit(machineInfo);
       output.emit(machineInfo);
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java
index 560df52..85ec954 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java
@@ -18,20 +18,23 @@
  */
 package com.datatorrent.demos.machinedata;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.demos.machinedata.data.MachineInfo;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Random;
+import java.util.TimeZone;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.demos.machinedata.data.MachineInfo;
+import com.datatorrent.demos.machinedata.data.MachineKey;
+
 /**
  * <p>
  * Information tuple generator with randomness.
@@ -74,6 +77,7 @@ public class InputReceiver extends BaseOperator implements InputOperator
     dayDateFormat.setTimeZone(tz);
 
   }
+
   @Override
   public void setup(Context.OperatorContext context)
   {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java
index 722a77e..2b3bb1c 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java
@@ -305,25 +305,29 @@ public class MachineKey
     if (!(obj instanceof MachineKey)) {
       return false;
     }
-    MachineKey mkey = (MachineKey) obj;
+    MachineKey mkey = (MachineKey)obj;
     return checkStringEqual(this.timeKey, mkey.timeKey) && checkStringEqual(this.day, mkey.day) && checkIntEqual(this.customer, mkey.customer) && checkIntEqual(this.product, mkey.product) && checkIntEqual(this.os, mkey.os) && checkIntEqual(this.software1, mkey.software1) && checkIntEqual(this.software2, mkey.software2) && checkIntEqual(this.software3, mkey.software3) && checkIntEqual(this.deviceId, mkey.deviceId);
   }
 
   private boolean checkIntEqual(Integer a, Integer b)
   {
-    if ((a == null) && (b == null))
+    if ((a == null) && (b == null)) {
       return true;
-    if ((a != null) && a.equals(b))
+    }
+    if ((a != null) && a.equals(b)) {
       return true;
+    }
     return false;
   }
 
   private boolean checkStringEqual(String a, String b)
   {
-    if ((a == null) && (b == null))
+    if ((a == null) && (b == null)) {
       return true;
-    if ((a != null) && a.equals(b))
+    }
+    if ((a != null) && a.equals(b)) {
       return true;
+    }
     return false;
   }
 
@@ -331,20 +335,27 @@ public class MachineKey
   public String toString()
   {
     StringBuilder sb = new StringBuilder(timeKey);
-    if (customer != null)
+    if (customer != null) {
       sb.append("|0:").append(customer);
-    if (product != null)
+    }
+    if (product != null) {
       sb.append("|1:").append(product);
-    if (os != null)
+    }
+    if (os != null) {
       sb.append("|2:").append(os);
-    if (software1 != null)
+    }
+    if (software1 != null) {
       sb.append("|3:").append(software1);
-    if (software2 != null)
+    }
+    if (software2 != null) {
       sb.append("|4:").append(software2);
-    if (software3 != null)
+    }
+    if (software3 != null) {
       sb.append("|5:").append(software3);
-    if (deviceId != null)
+    }
+    if (deviceId != null) {
       sb.append("|6:").append(deviceId);
+    }
     return sb.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java
index a0b2ecf..d474c5c 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java
@@ -18,45 +18,49 @@
  */
 package com.datatorrent.demos.machinedata.data;
 
-import com.google.common.collect.Maps;
-
 import java.util.Map;
 
+import com.google.common.collect.Maps;
+
 /**
  * This class captures the resources whose usage is collected for each device
  * <p>ResourceType class.</p>
  *
  * @since 0.3.5
  */
-public enum ResourceType {
+public enum ResourceType
+{
 
-    CPU("cpu"), RAM("ram"), HDD("hdd");
+  CPU("cpu"), RAM("ram"), HDD("hdd");
 
-    private static Map<String, ResourceType> descToResource = Maps.newHashMap();
+  private static Map<String, ResourceType> descToResource = Maps.newHashMap();
 
-    static {
-        for (ResourceType type : ResourceType.values()) {
-            descToResource.put(type.desc, type);
-        }
+  static {
+    for (ResourceType type : ResourceType.values()) {
+      descToResource.put(type.desc, type);
     }
+  }
 
-    private String desc;
+  private String desc;
 
-    private ResourceType(String desc) {
-        this.desc = desc;
-    }
+  private ResourceType(String desc)
+  {
+    this.desc = desc;
+  }
 
-    @Override
-    public String toString() {
-        return desc;
-    }
+  @Override
+  public String toString()
+  {
+    return desc;
+  }
 
-    /**
-     * This method returns ResourceType for the given description
-     * @param desc the description
-     * @return
-     */
-    public static ResourceType getResourceTypeOf(String desc) {
-        return descToResource.get(desc);
-    }
+  /**
+   * This method returns ResourceType for the given description
+   * @param desc the description
+   * @return
+   */
+  public static ResourceType getResourceTypeOf(String desc)
+  {
+    return descToResource.get(desc);
+  }
 }