You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2016/01/10 18:58:16 UTC

[1/2] incubator-apex-core git commit: APEXCORE-268 #resolve #comment removed style violations from common

Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 b14583c4d -> aded30c45


APEXCORE-268 #resolve #comment removed style violations from common


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/778436f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/778436f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/778436f7

Branch: refs/heads/devel-3
Commit: 778436f76a21f655d844c18b7a2956115c3853a9
Parents: 7629be8
Author: MalharJenkins <je...@datatorrent.com>
Authored: Fri Jan 8 13:49:38 2016 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Fri Jan 8 13:49:59 2016 -0800

----------------------------------------------------------------------
 common/pom.xml                                  |  2 +-
 .../common/codec/JsonStreamCodec.java           | 13 +++--
 .../common/experimental/AppData.java            | 14 ++++--
 .../common/metric/MetricsAggregator.java        | 12 ++---
 .../common/metric/SingleMetricAggregator.java   |  1 +
 .../partitioner/StatelessPartitioner.java       | 51 +++++++++-----------
 .../auth/callback/DefaultCallbackHandler.java   | 10 ++--
 .../common/util/AsyncFSStorageAgent.java        | 20 ++++++--
 .../datatorrent/common/util/BaseOperator.java   |  2 -
 .../util/BasicContainerOptConfigurator.java     | 22 ++++-----
 .../datatorrent/common/util/FSStorageAgent.java | 42 ++++++++--------
 .../util/JacksonObjectMapperProvider.java       |  6 ++-
 .../common/util/NameableThreadFactory.java      |  4 +-
 .../datatorrent/common/util/PubSubMessage.java  |  3 +-
 .../common/util/PubSubMessageCodec.java         |  9 ++--
 .../common/util/PubSubWebSocketClient.java      | 50 ++++++++++---------
 .../common/util/SerializableObject.java         | 41 ++++------------
 .../common/codec/JsonStreamCodecTest.java       |  4 +-
 .../partitioner/StatelessPartitionerTest.java   | 14 ++++--
 .../common/util/AsyncFSStorageAgentTest.java    | 13 ++---
 .../common/util/FSStorageAgentTest.java         | 19 ++++----
 .../common/util/SerializableObjectTest.java     |  5 +-
 22 files changed, 180 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 94f317f..5c2c98f 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -56,7 +56,7 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
         <configuration>
-          <maxAllowedViolations>114</maxAllowedViolations>
+          <logViolationsToConsole>true</logViolationsToConsole>
         </configuration>
       </plugin>
     </plugins>

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/codec/JsonStreamCodec.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/codec/JsonStreamCodec.java b/common/src/main/java/com/datatorrent/common/codec/JsonStreamCodec.java
index 1c3240b..a17023f 100644
--- a/common/src/main/java/com/datatorrent/common/codec/JsonStreamCodec.java
+++ b/common/src/main/java/com/datatorrent/common/codec/JsonStreamCodec.java
@@ -18,7 +18,9 @@
  */
 package com.datatorrent.common.codec;
 
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.util.Map;
 
 import org.codehaus.jackson.JsonGenerator;
@@ -65,8 +67,7 @@ public class JsonStreamCodec<T> implements StreamCodec<T>
             }
 
           });
-        }
-        catch (Exception ex) {
+        } catch (Exception ex) {
           logger.error("Caught exception when instantiating codec for class {}", entry.getKey().getName(), ex);
         }
       }
@@ -80,8 +81,7 @@ public class JsonStreamCodec<T> implements StreamCodec<T>
     ByteArrayInputStream bis = new ByteArrayInputStream(data.buffer, data.offset, data.length);
     try {
       return mapper.readValue(bis, Object.class);
-    }
-    catch (Exception ioe) {
+    } catch (Exception ioe) {
       throw new RuntimeException(ioe);
     }
   }
@@ -95,8 +95,7 @@ public class JsonStreamCodec<T> implements StreamCodec<T>
       mapper.writeValue(bos, o);
       byte[] bytes = bos.toByteArray();
       return new Slice(bytes, 0, bytes.length);
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/experimental/AppData.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/experimental/AppData.java b/common/src/main/java/com/datatorrent/common/experimental/AppData.java
index 7c2a56a..bbf9753 100644
--- a/common/src/main/java/com/datatorrent/common/experimental/AppData.java
+++ b/common/src/main/java/com/datatorrent/common/experimental/AppData.java
@@ -18,17 +18,19 @@
  */
 package com.datatorrent.common.experimental;
 
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator;
 import java.lang.annotation.Documented;
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Inherited;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
+
 import org.apache.hadoop.classification.InterfaceStability;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+
 /**
  * Interface for App Data support. Experimental only. This interface will likely change in the near future.
  *
@@ -96,6 +98,7 @@ public interface AppData
      * @return The connection url used by the AppData Query or Result operator.
      */
     public String getAppDataURL();
+
     /**
      * Returns the topic that the appdata Query or Result operator sends data to.
      * @return The topic that the appdata Query or Result operator sends data to.
@@ -110,7 +113,10 @@ public interface AppData
   @Target(ElementType.TYPE)
   @Retention(RetentionPolicy.RUNTIME)
   @Inherited
-  public @interface AppendQueryIdToTopic{boolean value() default false;}
+  public @interface AppendQueryIdToTopic
+  {
+    boolean value() default false;
+  }
 
   /**
    * Marker annotation for specifying appdata query ports.

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/metric/MetricsAggregator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/metric/MetricsAggregator.java b/common/src/main/java/com/datatorrent/common/metric/MetricsAggregator.java
index 4192354..e34f74a 100644
--- a/common/src/main/java/com/datatorrent/common/metric/MetricsAggregator.java
+++ b/common/src/main/java/com/datatorrent/common/metric/MetricsAggregator.java
@@ -98,8 +98,7 @@ public class MetricsAggregator implements AutoMetric.Aggregator, Serializable
     String aggregatorDesc;
     if (aggregatorName == null) {
       aggregatorDesc = aggregator.getClass().getName();
-    }
-    else {
+    } else {
       aggregatorDesc = aggregatorName.value();
     }
     return aggregatorDesc + aggregatorMetricSeparator + metric;
@@ -129,13 +128,13 @@ public class MetricsAggregator implements AutoMetric.Aggregator, Serializable
    *                           be used for the result of aggregators[i].
    */
   public void addAggregators(@NotNull String metric, @NotNull SingleMetricAggregator[] aggregators,
-                             @NotNull String[] logicalMetricNames)
+      @NotNull String[] logicalMetricNames)
   {
     Preconditions.checkNotNull(metric, "metric");
     Preconditions.checkNotNull(aggregators, "aggregators");
     Preconditions.checkNotNull(logicalMetricNames, "logicalMetricNames");
-    Preconditions.checkArgument(aggregators.length == logicalMetricNames.length, "different length aggregators and" +
-      " logical names");
+    Preconditions.checkArgument(aggregators.length == logicalMetricNames.length,
+        "different length aggregators and logical names");
     addAggregatorsHelper(metric, aggregators, logicalMetricNames);
   }
 
@@ -149,8 +148,7 @@ public class MetricsAggregator implements AutoMetric.Aggregator, Serializable
     for (int i = 0; i < aggregators.length; i++) {
 
       String resultName = (logicalMetricNames == null || logicalMetricNames[i] == null) ?
-        (aggregators.length == 1 ? metric : deriveLogicalMetricName(metric, aggregators[i]))
-        : logicalMetricNames[i];
+          (aggregators.length == 1 ? metric : deriveLogicalMetricName(metric, aggregators[i])) : logicalMetricNames[i];
 
       laggregators.add(new LogicalMetricMeta(aggregators[i], resultName));
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/metric/SingleMetricAggregator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/metric/SingleMetricAggregator.java b/common/src/main/java/com/datatorrent/common/metric/SingleMetricAggregator.java
index 9b568d2..dbc9b8a 100644
--- a/common/src/main/java/com/datatorrent/common/metric/SingleMetricAggregator.java
+++ b/common/src/main/java/com/datatorrent/common/metric/SingleMetricAggregator.java
@@ -19,6 +19,7 @@
 package com.datatorrent.common.metric;
 
 import java.util.Collection;
+
 /**
  * <p>SingleMetricAggregator interface.</p>
  *

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java b/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
index c36dd8a..165d8cf 100644
--- a/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
+++ b/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
@@ -19,16 +19,22 @@
 package com.datatorrent.common.partitioner;
 
 import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import javax.validation.constraints.Min;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 import com.datatorrent.api.DefaultPartition;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.InputPort;
@@ -100,7 +106,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
     logger.debug("define partitions, partitionCount current {} requested {}", partitions.size(), newPartitionCount);
 
     //Get a partition
-    DefaultPartition<T> partition = (DefaultPartition<T>) partitions.iterator().next();
+    DefaultPartition<T> partition = (DefaultPartition<T>)partitions.iterator().next();
     Collection<Partition<T>> newPartitions;
 
     if (partitions.iterator().next().getStats() == null) {
@@ -117,16 +123,13 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
       if (inputPortList != null && !inputPortList.isEmpty()) {
         DefaultPartition.assignPartitionKeys(newPartitions, inputPortList.iterator().next());
       }
-    }
-    else {
+    } else {
       // define partitions is being called again
       if (context.getParallelPartitionCount() != 0) {
         newPartitions = repartitionParallel(partitions, context);
-      }
-      else if (partition.getPartitionKeys().isEmpty()) {
+      } else if (partition.getPartitionKeys().isEmpty()) {
         newPartitions = repartitionInputOperator(partitions);
-      }
-      else {
+      } else {
         newPartitions = repartition(partitions);
       }
     }
@@ -166,8 +169,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
           Partition<T> siblingPartition = lowLoadPartitions.remove(partitionKey & reducedMask);
           if (siblingPartition == null) {
             lowLoadPartitions.put(partitionKey & reducedMask, p);
-          }
-          else {
+          } else {
             // both of the partitions are low load, combine
             PartitionKeys newPks = new PartitionKeys(reducedMask, Sets.newHashSet(partitionKey & reducedMask));
             // put new value so the map gets marked as modified
@@ -178,8 +180,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
             //LOG.debug("partition keys after merge {}", siblingPartition.getPartitionKeys());
           }
         }
-      }
-      else if (load > 0) {
+      } else if (load > 0) {
         // split bottlenecks
         Map<InputPort<?>, PartitionKeys> keys = p.getPartitionKeys();
         Map.Entry<InputPort<?>, PartitionKeys> e = keys.entrySet().iterator().next();
@@ -193,8 +194,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
           int key = e.getValue().partitions.iterator().next();
           int key2 = (newMask ^ e.getValue().mask) | key;
           newKeys = Sets.newHashSet(key, key2);
-        }
-        else {
+        } else {
           // assign keys to separate partitions
           newMask = e.getValue().mask;
           newKeys = e.getValue().partitions;
@@ -205,8 +205,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
           newPartition.getPartitionKeys().put(e.getKey(), new PartitionKeys(newMask, Sets.newHashSet(key)));
           newPartitions.add(newPartition);
         }
-      }
-      else {
+      } else {
         // leave unchanged
         newPartitions.add(p);
       }
@@ -232,16 +231,13 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
       if (load < 0) {
         if (!lowLoadPartitions.isEmpty()) {
           newPartitions.add(lowLoadPartitions.remove(0));
-        }
-        else {
+        } else {
           lowLoadPartitions.add(p);
         }
-      }
-      else if (load > 0) {
+      } else if (load > 0) {
         newPartitions.add(new DefaultPartition<T>(p.getPartitionedInstance()));
         newPartitions.add(new DefaultPartition<T>(p.getPartitionedInstance()));
-      }
-      else {
+      } else {
         newPartitions.add(p);
       }
     }
@@ -259,7 +255,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
    * @return new adjusted partitions
    */
   public static <T extends Operator> Collection<Partition<T>> repartitionParallel(Collection<Partition<T>> partitions,
-                                                                                  PartitioningContext context)
+      PartitioningContext context)
   {
     List<Partition<T>> newPartitions = Lists.newArrayList();
     newPartitions.addAll(partitions);
@@ -273,8 +269,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
         partitionIterator.next();
         partitionIterator.remove();
       }
-    }
-    else {
+    } else {
       //Add more partitions
       T anOperator = newPartitions.iterator().next().getPartitionedInstance();
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/security/auth/callback/DefaultCallbackHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/security/auth/callback/DefaultCallbackHandler.java b/common/src/main/java/com/datatorrent/common/security/auth/callback/DefaultCallbackHandler.java
index 5792ad5..779a156 100644
--- a/common/src/main/java/com/datatorrent/common/security/auth/callback/DefaultCallbackHandler.java
+++ b/common/src/main/java/com/datatorrent/common/security/auth/callback/DefaultCallbackHandler.java
@@ -20,14 +20,18 @@ package com.datatorrent.common.security.auth.callback;
 
 import java.io.IOException;
 
-import javax.security.auth.callback.*;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.TextOutputCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.sasl.RealmCallback;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.Component;
-
 import com.datatorrent.common.security.SecurityContext;
 
 /**
@@ -78,7 +82,7 @@ public class DefaultCallbackHandler implements CallbackHandler, Component<Securi
       PasswordCallback passcb = (PasswordCallback)callback;
       passcb.setPassword(context.getValue(SecurityContext.PASSWORD));
     } else if (callback instanceof RealmCallback) {
-      RealmCallback realmcb = (RealmCallback) callback;
+      RealmCallback realmcb = (RealmCallback)callback;
       realmcb.setText(context.getValue(SecurityContext.REALM));
     } else if (callback instanceof TextOutputCallback) {
       TextOutputCallback textcb = (TextOutputCallback)callback;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index 83bbdca..788a68c 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -18,16 +18,26 @@
  */
 package com.datatorrent.common.util;
 
-import java.io.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamException;
 import java.nio.file.Files;
 import java.util.EnumSet;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**
@@ -74,7 +84,7 @@ public class AsyncFSStorageAgent extends FSStorageAgent
   @Override
   public void save(final Object object, final int operatorId, final long windowId) throws IOException
   {
-    if(syncCheckpoint){
+    if (syncCheckpoint) {
       super.save(object, operatorId, windowId);
       return;
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/util/BaseOperator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/BaseOperator.java b/common/src/main/java/com/datatorrent/common/util/BaseOperator.java
index 5b00e44..4601f3a 100644
--- a/common/src/main/java/com/datatorrent/common/util/BaseOperator.java
+++ b/common/src/main/java/com/datatorrent/common/util/BaseOperator.java
@@ -21,8 +21,6 @@ package com.datatorrent.common.util;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Operator;
 
-import com.datatorrent.common.util.SerializableObject;
-
 /**
  * Base class for operator implementations that provides empty implementations
  * for all interface methods.

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/util/BasicContainerOptConfigurator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/BasicContainerOptConfigurator.java b/common/src/main/java/com/datatorrent/common/util/BasicContainerOptConfigurator.java
index 328ec7b..86ce23d 100644
--- a/common/src/main/java/com/datatorrent/common/util/BasicContainerOptConfigurator.java
+++ b/common/src/main/java/com/datatorrent/common/util/BasicContainerOptConfigurator.java
@@ -63,11 +63,10 @@ public class BasicContainerOptConfigurator implements Context.ContainerOptConfig
     for (DAG.OperatorMeta operatorMeta : operatorMetaList) {
       Map<String, Object> operatorMap = parseJvmOpts(operatorMeta.getValue(Context.OperatorContext.JVM_OPTIONS), operatorMeta.getValue(Context.OperatorContext.MEMORY_MB));
       LOG.info("property map for operator {}", operatorMap);
-      Set<String> operatorPropertySet = (Set<String>) operatorMap.get(GENERIC);
+      Set<String> operatorPropertySet = (Set<String>)operatorMap.get(GENERIC);
       if (genericProperties == null) {
         genericProperties = operatorPropertySet;
-      }
-      else {
+      } else {
         if (operatorPropertySet != null && !genericProperties.equals(operatorPropertySet)) {
           throw new AssertionError("Properties don't match: " + genericProperties + " " + operatorPropertySet);
         }
@@ -77,15 +76,15 @@ public class BasicContainerOptConfigurator implements Context.ContainerOptConfig
     for (Map<String, Object> map : jvmOptsList) {
       String value;
       if (map.containsKey(XMX)) {
-        value = (String) map.get(XMX);
+        value = (String)map.get(XMX);
         xmx += getOptValue(value);
       }
       if (map.containsKey(XMS)) {
-        value = (String) map.get(XMS);
+        value = (String)map.get(XMS);
         xms += getOptValue(value);
       }
       if (map.containsKey(XSS)) {
-        value = (String) map.get(XSS);
+        value = (String)map.get(XSS);
         xss += getOptValue(value);
       }
     }
@@ -110,14 +109,11 @@ public class BasicContainerOptConfigurator implements Context.ContainerOptConfig
     long result;
     if (value.endsWith("g") || value.endsWith("G")) {
       result = Long.valueOf(value.substring(0, value.length() - 1)) * GB_TO_B;
-    }
-    else if (value.endsWith("m") || value.endsWith("M")) {
+    } else if (value.endsWith("m") || value.endsWith("M")) {
       result = Long.valueOf(value.substring(0, value.length() - 1)) * MB_TO_B;
-    }
-    else if (value.endsWith("k") || value.endsWith("K")) {
+    } else if (value.endsWith("k") || value.endsWith("K")) {
       result = Long.valueOf(value.substring(0, value.length() - 1)) * KB_TO_B;
-    }
-    else {
+    } else {
       result = Long.valueOf(value);
     }
     return result;
@@ -151,7 +147,7 @@ public class BasicContainerOptConfigurator implements Context.ContainerOptConfig
       int memoryOverhead = memory / 4;
       int heapSize = memory - memoryOverhead;
       if (memoryOverhead > 1024) {
-         heapSize = memory - 1024;
+        heapSize = memory - 1024;
       }
       map.put(XMX, heapSize + "m");
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
index c2f68a0..fd4c450 100644
--- a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
@@ -18,16 +18,28 @@
  */
 package com.datatorrent.common.util;
 
-import java.io.*;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamException;
+import java.io.OutputStream;
+import java.io.Serializable;
 import java.net.URI;
 import java.util.EnumSet;
 import java.util.List;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
@@ -35,7 +47,6 @@ import com.google.common.collect.Lists;
 
 import com.datatorrent.api.StorageAgent;
 import com.datatorrent.api.annotation.Stateless;
-
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**
@@ -71,12 +82,10 @@ public class FSStorageAgent implements StorageAgent, Serializable
 
       if (pathUri.getScheme() != null) {
         fileContext = FileContext.getFileContext(pathUri, conf == null ? new Configuration() : conf);
-      }
-      else {
+      } else {
         fileContext = FileContext.getFileContext(conf == null ? new Configuration() : conf);
       }
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
   }
@@ -95,27 +104,23 @@ public class FSStorageAgent implements StorageAgent, Serializable
         Options.CreateOpts.CreateParent.createParent());
       store(stream, object);
       stateSaved = true;
-    }
-    catch (Throwable t) {
+    } catch (Throwable t) {
       logger.debug("while saving {} {}", operatorId, window, t);
       stateSaved = false;
       DTThrowable.rethrow(t);
-    }
-    finally {
+    } finally {
       try {
         if (stream != null) {
           stream.close();
         }
-      }
-      catch (IOException ie) {
+      } catch (IOException ie) {
         stateSaved = false;
         throw new RuntimeException(ie);
-      }
-      finally {
+      } finally {
         if (stateSaved) {
           logger.debug("Saving {}: {}", operatorId, window);
           fileContext.rename(lPath, new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + window),
-            Options.Rename.OVERWRITE);
+              Options.Rename.OVERWRITE);
         }
       }
     }
@@ -130,8 +135,7 @@ public class FSStorageAgent implements StorageAgent, Serializable
     FSDataInputStream stream = fileContext.open(lPath);
     try {
       return retrieve(stream);
-    }
-    finally {
+    } finally {
       stream.close();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java b/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
index 2f1735f..7723fed 100644
--- a/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
+++ b/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
@@ -19,13 +19,17 @@
 package com.datatorrent.common.util;
 
 import java.io.IOException;
+
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.ext.ContextResolver;
 import javax.ws.rs.ext.Provider;
+
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.Version;
-import org.codehaus.jackson.map.*;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
 import org.codehaus.jackson.map.module.SimpleModule;
 import org.codehaus.jackson.map.ser.std.RawSerializer;
 import org.codehaus.jettison.json.JSONArray;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/util/NameableThreadFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/NameableThreadFactory.java b/common/src/main/java/com/datatorrent/common/util/NameableThreadFactory.java
index f3aa170..f0c61b5 100644
--- a/common/src/main/java/com/datatorrent/common/util/NameableThreadFactory.java
+++ b/common/src/main/java/com/datatorrent/common/util/NameableThreadFactory.java
@@ -54,9 +54,7 @@ public class NameableThreadFactory implements ThreadFactory
   @Override
   public Thread newThread(Runnable r)
   {
-    Thread t = new Thread(group, r,
-                          namePrefix + threadNumber.getAndIncrement(),
-                          0);
+    Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
     if (t.isDaemon() != this.isDaemon) {
       t.setDaemon(isDaemon);
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/util/PubSubMessage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/PubSubMessage.java b/common/src/main/java/com/datatorrent/common/util/PubSubMessage.java
index 1704edb..f263d0c 100644
--- a/common/src/main/java/com/datatorrent/common/util/PubSubMessage.java
+++ b/common/src/main/java/com/datatorrent/common/util/PubSubMessage.java
@@ -53,7 +53,8 @@ public class PubSubMessage<T>
       return identifier;
     }
 
-    public static PubSubMessageType getPubSubMessageType(String identifier) {
+    public static PubSubMessageType getPubSubMessageType(String identifier)
+    {
       PubSubMessageType pubSubMessageType = null;
       for (PubSubMessageType value : PubSubMessageType.values()) {
         if (value.getIdentifier().equals(identifier)) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java b/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
index aa84213..926fa8f 100644
--- a/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
+++ b/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
@@ -36,11 +36,13 @@ public class PubSubMessageCodec<T>
 
   private final ObjectMapper mapper;
 
-  public PubSubMessageCodec(ObjectMapper mapper) {
+  public PubSubMessageCodec(ObjectMapper mapper)
+  {
     this.mapper = mapper;
   }
 
-  public String formatMessage(PubSubMessage<T> pubSubMessage) throws IOException {
+  public String formatMessage(PubSubMessage<T> pubSubMessage) throws IOException
+  {
     HashMap<String, Object> map = new HashMap<String, Object>();
     map.put(PubSubMessage.TYPE_KEY, pubSubMessage.getType().getIdentifier());
     map.put(PubSubMessage.TOPIC_KEY, pubSubMessage.getTopic());
@@ -59,7 +61,8 @@ public class PubSubMessageCodec<T>
    * @throws IOException
    */
   @SuppressWarnings({"unchecked"})
-  public PubSubMessage<T> parseMessage(String message) throws IOException {
+  public PubSubMessage<T> parseMessage(String message) throws IOException
+  {
     HashMap<String, Object> map = mapper.readValue(message, HashMap.class);
     PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
     pubSubMessage.setType(PubSubMessageType.getPubSubMessageType((String)map.get(PubSubMessage.TYPE_KEY)));

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/util/PubSubWebSocketClient.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/PubSubWebSocketClient.java b/common/src/main/java/com/datatorrent/common/util/PubSubWebSocketClient.java
index 58072ee..c3f5961 100644
--- a/common/src/main/java/com/datatorrent/common/util/PubSubWebSocketClient.java
+++ b/common/src/main/java/com/datatorrent/common/util/PubSubWebSocketClient.java
@@ -21,13 +21,12 @@ package com.datatorrent.common.util;
 import java.io.IOException;
 import java.net.URI;
 import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
-import com.ning.http.client.*;
-import com.ning.http.client.AsyncHttpClient.BoundRequestBuilder;
-import com.ning.http.client.websocket.*;
-
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -36,11 +35,19 @@ import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.common.util.PubSubMessage.PubSubMessageType;
+import com.ning.http.client.AsyncCompletionHandler;
+import com.ning.http.client.AsyncHttpClient;
+import com.ning.http.client.AsyncHttpClient.BoundRequestBuilder;
+import com.ning.http.client.AsyncHttpClientConfigBean;
+import com.ning.http.client.Cookie;
+import com.ning.http.client.Response;
+import com.ning.http.client.websocket.WebSocket;
+import com.ning.http.client.websocket.WebSocketTextListener;
+import com.ning.http.client.websocket.WebSocketUpgradeHandler;
 
 import com.datatorrent.api.Component;
 import com.datatorrent.api.Context;
-
+import com.datatorrent.common.util.PubSubMessage.PubSubMessageType;
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**
@@ -71,14 +78,11 @@ public abstract class PubSubWebSocketClient implements Component<Context>
       try {
         pubSubMessage = codec.parseMessage(message);
         PubSubWebSocketClient.this.onMessage(pubSubMessage.getType().getIdentifier(), pubSubMessage.getTopic(), pubSubMessage.getData());
-      }
-      catch (JsonParseException jpe) {
+      } catch (JsonParseException jpe) {
         logger.warn("Ignoring unparseable JSON message: {}", message, jpe);
-      }
-      catch (JsonMappingException jme) {
+      } catch (JsonMappingException jme) {
         logger.warn("Ignoring JSON mapping in message: {}", message, jme);
-      }
-      catch (IOException ex) {
+      } catch (IOException ex) {
         onError(ex);
       }
     }
@@ -175,8 +179,7 @@ public abstract class PubSubWebSocketClient implements Component<Context>
       try {
         json.put("userName", userName);
         json.put("password", password);
-      }
-      catch (JSONException ex) {
+      } catch (JSONException ex) {
         throw new RuntimeException(ex);
       }
       Response response = client.preparePost(loginUrl).setHeader("Content-Type", "application/json").setBody(json.toString()).execute().get();
@@ -201,8 +204,7 @@ public abstract class PubSubWebSocketClient implements Component<Context>
       try {
         json.put("userName", userName);
         json.put("password", password);
-      }
-      catch (JSONException ex) {
+      } catch (JSONException ex) {
         throw new RuntimeException(ex);
       }
       client.preparePost(loginUrl).setHeader("Content-Type", "application/json").setBody(json.toString()).execute(new AsyncCompletionHandler<Response>()
@@ -223,9 +225,8 @@ public abstract class PubSubWebSocketClient implements Component<Context>
         }
 
       });
-    }
-    else {
-      client.prepareGet(uri.toString()).execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new PubSubWebSocket()
+    } else {
+      final PubSubWebSocket webSocket = new PubSubWebSocket()
       {
         @Override
         public void onOpen(WebSocket ws)
@@ -233,8 +234,9 @@ public abstract class PubSubWebSocketClient implements Component<Context>
           connection = ws;
           super.onOpen(ws);
         }
-
-      }).build());
+      };
+      client.prepareGet(uri.toString()).execute(
+          new WebSocketUpgradeHandler.Builder().addWebSocketListener(webSocket).build());
     }
   }
 
@@ -289,11 +291,11 @@ public abstract class PubSubWebSocketClient implements Component<Context>
     Throwable t = throwable.get();
     if (t instanceof IOException) {
       throw (IOException)t;
-    }
-    else {
+    } else {
       DTThrowable.rethrow(t);
     }
   }
+
   /**
    * <p>publish.</p>
    *

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/util/SerializableObject.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/SerializableObject.java b/common/src/main/java/com/datatorrent/common/util/SerializableObject.java
index 203399f..caed968 100644
--- a/common/src/main/java/com/datatorrent/common/util/SerializableObject.java
+++ b/common/src/main/java/com/datatorrent/common/util/SerializableObject.java
@@ -22,8 +22,8 @@ import java.io.ObjectStreamException;
 import java.io.Serializable;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Modifier;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,37 +58,21 @@ public class SerializableObject implements Serializable
       Constructor<? extends SerializableObject> constructor = this.getClass().getConstructor(this.getClass());
       try {
         constructor.setAccessible(true);
-      }
-      catch (SecurityException ex) {
+      } catch (SecurityException ex) {
         logger.warn("Accessing copy constructor {} failed.", constructor, ex);
       }
       try {
         return constructor.newInstance(this);
-      }
-      catch (InstantiationException ex) {
-        throw new RuntimeException("Instantiation using copy constructor failed!", ex);
-      }
-      catch (IllegalAccessException ex) {
+      } catch (ReflectiveOperationException | IllegalArgumentException ex) {
         throw new RuntimeException("Instantiation using copy constructor failed!", ex);
       }
-      catch (IllegalArgumentException ex) {
-        throw new RuntimeException("Instantiation using copy constructor failed!", ex);
-      }
-      catch (InvocationTargetException ex) {
-        throw new RuntimeException("Instantiation using copy constructor failed!", ex);
-      }
-    }
-    catch (NoSuchMethodException snme) {
+    } catch (NoSuchMethodException snme) {
       logger.debug("No copy constructor detected for class {}, trying default constructor.", this.getClass().getSimpleName());
       try {
         SerializableObject newInstance = this.getClass().newInstance();
         transferStateTo(newInstance);
         return newInstance;
-      }
-      catch (IllegalAccessException ex) {
-        throw new RuntimeException("Deserialization using default constructor failed!", ex);
-      }
-      catch (InstantiationException ex) {
+      } catch (ReflectiveOperationException ex) {
         throw new RuntimeException("Deserialization using default constructor failed!", ex);
       }
     }
@@ -108,27 +92,20 @@ public class SerializableObject implements Serializable
         if (!(Modifier.isFinal(modifiers) && Modifier.isTransient(modifiers) || Modifier.isStatic(modifiers))) {
           try {
             field.setAccessible(true);
-          }
-          catch (SecurityException ex) {
+          } catch (SecurityException ex) {
             logger.warn("Cannot set field {} accessible.", field, ex);
           }
           try {
             field.set(dest, field.get(this));
-          }
-          catch (IllegalArgumentException ex) {
+          } catch (IllegalArgumentException ex) {
             throw new RuntimeException("Getter/Setter argument failed using reflection on " + field, ex);
-          }
-          catch (IllegalAccessException ex) {
+          } catch (IllegalAccessException ex) {
             throw new RuntimeException("Getter/Setter access failed using reflection on " + field, ex);
           }
           if (!field.getType().isPrimitive()) {
             try {
               field.set(this, null);
-            }
-            catch (IllegalArgumentException ex) {
-              logger.warn("Failed to set field {} to null; generally it's harmless, but with reference counted data structure this may be an issue.", field, ex);
-            }
-            catch (IllegalAccessException ex) {
+            } catch (IllegalArgumentException | IllegalAccessException ex) {
               logger.warn("Failed to set field {} to null; generally it's harmless, but with reference counted data structure this may be an issue.", field, ex);
             }
           }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java b/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java
index d99a186..b31009e 100644
--- a/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java
+++ b/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java
@@ -18,13 +18,15 @@
  */
 package com.datatorrent.common.codec;
 
-import com.datatorrent.api.StringCodec;
 import java.util.HashMap;
 import java.util.Map;
+
 import org.codehaus.jettison.json.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.datatorrent.api.StringCodec;
+
 public class JsonStreamCodecTest
 {
   static class PojoClass

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
index 25e5fcc..687957c 100644
--- a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
+++ b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
@@ -25,9 +25,13 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
+
 import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.*;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.Partitioner.Partition;
 import com.datatorrent.api.StringCodec.Object2String;
 
@@ -97,7 +101,7 @@ public class StatelessPartitionerTest
     Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0));
     Assert.assertEquals("Incorrect number of partitions", 1, newPartitions.size());
 
-    for(Partition<DummyOperator> partition: newPartitions) {
+    for (Partition<DummyOperator> partition : newPartitions) {
       Assert.assertEquals("Incorrect cloned value", 5, partition.getPartitionedInstance().getValue());
     }
   }
@@ -115,7 +119,7 @@ public class StatelessPartitionerTest
     Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0));
     Assert.assertEquals("Incorrect number of partitions", 5, newPartitions.size());
 
-    for(Partition<DummyOperator> partition: newPartitions) {
+    for (Partition<DummyOperator> partition : newPartitions) {
       Assert.assertEquals("Incorrect cloned value", 5, partition.getPartitionedInstance().getValue());
     }
   }
@@ -138,7 +142,7 @@ public class StatelessPartitionerTest
     partitions.add(new DefaultPartition<DummyOperator>(dummyOperator));
 
     Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions,
-      new PartitioningContextImpl(null, 5));
+        new PartitioningContextImpl(null, 5));
     Assert.assertEquals("after partition", 5, newPartitions.size());
   }
 
@@ -155,7 +159,7 @@ public class StatelessPartitionerTest
     }
 
     Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions,
-      new PartitioningContextImpl(null, 1));
+        new PartitioningContextImpl(null, 1));
     Assert.assertEquals("after partition", 1, newPartitions.size());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java b/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
index e2522cb..e644846 100644
--- a/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
+++ b/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
@@ -22,15 +22,16 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+
 import com.google.common.collect.Maps;
 
 import com.datatorrent.api.Attribute;
@@ -85,7 +86,7 @@ public class AsyncFSStorageAgentTest
     testMeta.storageAgent.save(data, 1, 1);
     testMeta.storageAgent.copyToHDFS(1, 1);
     @SuppressWarnings("unchecked")
-    Map<Integer, String> decoded = (Map<Integer, String>) testMeta.storageAgent.load(1, 1);
+    Map<Integer, String> decoded = (Map<Integer, String>)testMeta.storageAgent.load(1, 1);
     Assert.assertEquals("dataOf1", data, decoded);
   }
 
@@ -107,10 +108,10 @@ public class AsyncFSStorageAgentTest
     testMeta.storageAgent.save(dataOf2, 2, 1);
     testMeta.storageAgent.copyToHDFS(2, 1);
     @SuppressWarnings("unchecked")
-    Map<Integer, String> decoded1 = (Map<Integer, String>) testMeta.storageAgent.load(1, 1);
+    Map<Integer, String> decoded1 = (Map<Integer, String>)testMeta.storageAgent.load(1, 1);
 
     @SuppressWarnings("unchecked")
-    Map<Integer, String> decoded2 = (Map<Integer, String>) testMeta.storageAgent.load(2, 1);
+    Map<Integer, String> decoded2 = (Map<Integer, String>)testMeta.storageAgent.load(2, 1);
     Assert.assertEquals("data of 1", dataOf1, decoded1);
     Assert.assertEquals("data of 2", dataOf2, decoded2);
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/test/java/com/datatorrent/common/util/FSStorageAgentTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/util/FSStorageAgentTest.java b/common/src/test/java/com/datatorrent/common/util/FSStorageAgentTest.java
index e9fc1ea..0d6e38b 100644
--- a/common/src/test/java/com/datatorrent/common/util/FSStorageAgentTest.java
+++ b/common/src/test/java/com/datatorrent/common/util/FSStorageAgentTest.java
@@ -22,15 +22,16 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+
 import com.google.common.collect.Maps;
 
 import com.datatorrent.api.Attribute;
@@ -50,8 +51,7 @@ public class FSStorageAgentTest
       applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
       try {
         FileUtils.forceMkdir(new File("target/" + description.getClassName()));
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         throw new RuntimeException(e);
       }
       storageAgent = new FSStorageAgent(applicationPath, null);
@@ -65,8 +65,7 @@ public class FSStorageAgentTest
     {
       try {
         FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         throw new RuntimeException(e);
       }
     }
@@ -84,7 +83,7 @@ public class FSStorageAgentTest
     data.put(3, "three");
     testMeta.storageAgent.save(data, 1, 1);
     @SuppressWarnings("unchecked")
-    Map<Integer, String> decoded = (Map<Integer, String>) testMeta.storageAgent.load(1, 1);
+    Map<Integer, String> decoded = (Map<Integer, String>)testMeta.storageAgent.load(1, 1);
     Assert.assertEquals("dataOf1", data, decoded);
   }
 
@@ -104,10 +103,10 @@ public class FSStorageAgentTest
     testMeta.storageAgent.save(dataOf1, 1, 1);
     testMeta.storageAgent.save(dataOf2, 2, 1);
     @SuppressWarnings("unchecked")
-    Map<Integer, String> decoded1 = (Map<Integer, String>) testMeta.storageAgent.load(1, 1);
+    Map<Integer, String> decoded1 = (Map<Integer, String>)testMeta.storageAgent.load(1, 1);
 
     @SuppressWarnings("unchecked")
-    Map<Integer, String> decoded2 = (Map<Integer, String>) testMeta.storageAgent.load(2, 1);
+    Map<Integer, String> decoded2 = (Map<Integer, String>)testMeta.storageAgent.load(2, 1);
     Assert.assertEquals("data of 1", dataOf1, decoded1);
     Assert.assertEquals("data of 2", dataOf2, decoded2);
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java b/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
index c44b953..97debe3 100644
--- a/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
+++ b/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
@@ -24,13 +24,14 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 
 import org.junit.Test;
-import static org.junit.Assert.*;
 
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.Operator.InputPort;
 import com.datatorrent.api.Operator.OutputPort;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  *
  */
@@ -124,4 +125,4 @@ public class SerializableObjectTest
     assertEquals("Serialized Deserialized Objects", pre, post);
   }
 
-}
\ No newline at end of file
+}


[2/2] incubator-apex-core git commit: Merge branch 'APEXCORE-268' of https://github.com/chandnisingh/incubator-apex-core into APEXCORE-268

Posted by vr...@apache.org.
Merge branch 'APEXCORE-268' of https://github.com/chandnisingh/incubator-apex-core into APEXCORE-268


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/aded30c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/aded30c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/aded30c4

Branch: refs/heads/devel-3
Commit: aded30c45cc7bd2dba08243c5d0536fe9fe13bb1
Parents: b14583c 778436f
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Sun Jan 10 09:52:05 2016 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Sun Jan 10 09:52:05 2016 -0800

----------------------------------------------------------------------
 common/pom.xml                                  |  2 +-
 .../common/codec/JsonStreamCodec.java           | 13 +++--
 .../common/experimental/AppData.java            | 14 ++++--
 .../common/metric/MetricsAggregator.java        | 12 ++---
 .../common/metric/SingleMetricAggregator.java   |  1 +
 .../partitioner/StatelessPartitioner.java       | 51 +++++++++-----------
 .../auth/callback/DefaultCallbackHandler.java   | 10 ++--
 .../common/util/AsyncFSStorageAgent.java        | 20 ++++++--
 .../datatorrent/common/util/BaseOperator.java   |  2 -
 .../util/BasicContainerOptConfigurator.java     | 22 ++++-----
 .../datatorrent/common/util/FSStorageAgent.java | 42 ++++++++--------
 .../util/JacksonObjectMapperProvider.java       |  6 ++-
 .../common/util/NameableThreadFactory.java      |  4 +-
 .../datatorrent/common/util/PubSubMessage.java  |  3 +-
 .../common/util/PubSubMessageCodec.java         |  9 ++--
 .../common/util/PubSubWebSocketClient.java      | 50 ++++++++++---------
 .../common/util/SerializableObject.java         | 41 ++++------------
 .../common/codec/JsonStreamCodecTest.java       |  4 +-
 .../partitioner/StatelessPartitionerTest.java   | 14 ++++--
 .../common/util/AsyncFSStorageAgentTest.java    | 13 ++---
 .../common/util/FSStorageAgentTest.java         | 19 ++++----
 .../common/util/SerializableObjectTest.java     |  5 +-
 22 files changed, 180 insertions(+), 177 deletions(-)
----------------------------------------------------------------------