You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/04/09 20:39:38 UTC

[2/2] apex-core git commit: APEXCORE-691 Use type inference for generic instance creation closes #505

APEXCORE-691 Use type inference for generic instance creation
closes #505


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/aa81bea3
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/aa81bea3
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/aa81bea3

Branch: refs/heads/master
Commit: aa81bea306aed51fd881d97ce62a01537eeb2003
Parents: 9383613
Author: Apex Dev <de...@apex.apache.org>
Authored: Fri Apr 7 10:08:45 2017 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Sun Apr 9 13:38:47 2017 -0700

----------------------------------------------------------------------
 .../java/com/datatorrent/api/AffinityRule.java  |  2 +-
 .../java/com/datatorrent/api/Attribute.java     |  6 +-
 .../main/java/com/datatorrent/api/Context.java  | 76 ++++++++++----------
 .../java/com/datatorrent/api/StringCodec.java   |  8 +--
 .../org/apache/apex/api/YarnAppLauncher.java    |  6 +-
 .../com/datatorrent/api/AttributeMapTest.java   |  2 +-
 .../bufferserver/internal/DataList.java         | 10 +--
 .../bufferserver/internal/LogicalNode.java      |  4 +-
 .../datatorrent/bufferserver/server/Server.java |  4 +-
 .../datatorrent/bufferserver/util/System.java   |  2 +-
 .../packet/SubscribeRequestTupleTest.java       |  2 +-
 .../bufferserver/support/Subscriber.java        |  2 +-
 .../partitioner/StatelessPartitioner.java       | 18 ++---
 .../common/security/SecurityContext.java        |  6 +-
 .../common/util/DefaultDelayOperator.java       |  2 +-
 .../util/JacksonObjectMapperProvider.java       |  6 +-
 .../common/util/PubSubMessageCodec.java         | 14 ++--
 .../partitioner/StatelessPartitionerTest.java   | 18 ++---
 .../common/util/SerializableObjectTest.java     |  4 +-
 .../BlacklistBasedResourceRequestHandler.java   |  4 +-
 .../stram/ResourceRequestHandler.java           |  4 +-
 .../stram/client/PermissionsInfo.java           |  8 +--
 .../stram/client/StramClientUtils.java          |  2 +-
 .../stram/engine/OperatorContext.java           |  4 +-
 .../stram/plan/logical/LogicalPlan.java         | 10 +--
 .../stram/plan/physical/PhysicalPlan.java       |  4 +-
 .../stram/util/PubSubWebSocketClient.java       |  4 +-
 .../stram/webapp/asm/CompactUtil.java           |  2 +-
 .../com/datatorrent/stram/webapp/asm/Type.java  |  4 +-
 .../com/datatorrent/stram/CheckpointTest.java   |  4 +-
 .../stram/GenericOperatorPropertyCodecTest.java |  2 +-
 .../com/datatorrent/stram/PartitioningTest.java | 24 +++----
 .../com/datatorrent/stram/StreamCodecTest.java  |  8 +--
 .../stram/StreamingContainerManagerTest.java    |  4 +-
 .../datatorrent/stram/cli/ApexCliMiscTest.java  |  2 +-
 .../com/datatorrent/stram/cli/ApexCliTest.java  |  2 +-
 .../codec/DefaultStatefulStreamCodecTest.java   |  8 +--
 .../stram/engine/AtMostOnceTest.java            |  2 +-
 .../stram/engine/GenericNodeTest.java           | 12 ++--
 .../stram/engine/InputOperatorTest.java         | 10 +--
 .../com/datatorrent/stram/engine/NodeTest.java  |  2 +-
 .../stram/engine/ProcessingModeTests.java       |  8 +--
 .../datatorrent/stram/engine/SliderTest.java    |  2 +-
 .../com/datatorrent/stram/engine/StatsTest.java |  6 +-
 .../stram/engine/StreamingContainerTest.java    |  2 +-
 .../engine/TestGeneratorInputOperator.java      |  4 +-
 .../stram/engine/WindowGeneratorTest.java       |  2 +-
 .../moduleexperiment/InjectConfigTest.java      |  2 +-
 .../stram/plan/StreamPersistanceTests.java      |  8 +--
 .../datatorrent/stram/plan/TestPlanContext.java |  2 +-
 .../logical/LogicalPlanConfigurationTest.java   |  6 +-
 .../stram/plan/logical/LogicalPlanTest.java     | 12 ++--
 .../plan/logical/module/ModuleAppTest.java      | 10 +--
 .../logical/module/TestModuleExpansion.java     |  2 +-
 .../stram/plan/logical/module/TestModules.java  |  4 +-
 .../stram/plan/physical/PhysicalPlanTest.java   | 26 +++----
 .../stream/BufferServerSubscriberTest.java      |  2 +-
 .../stram/stream/FastPublisherTest.java         |  2 +-
 .../stram/stream/FastStreamTest.java            |  2 +-
 .../stram/stream/InlineStreamTest.java          |  8 +--
 .../stram/stream/OiOEndWindowTest.java          |  4 +-
 .../datatorrent/stram/stream/OiOStreamTest.java | 14 ++--
 .../stram/stream/SocketStreamTest.java          |  2 +-
 .../support/ManualScheduledExecutorService.java |  2 +-
 .../stram/support/StramTestSupport.java         |  6 +-
 .../stram/util/StablePriorityQueueTest.java     |  4 +-
 .../stram/webapp/OperatorDiscoveryTest.java     | 26 +++----
 .../stram/webapp/StramWebServicesTest.java      |  6 +-
 .../stram/webapp/TypeDiscoveryTest.java         | 12 ++--
 69 files changed, 252 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/api/src/main/java/com/datatorrent/api/AffinityRule.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/AffinityRule.java b/api/src/main/java/com/datatorrent/api/AffinityRule.java
index 5e10ccd..304c086 100644
--- a/api/src/main/java/com/datatorrent/api/AffinityRule.java
+++ b/api/src/main/java/com/datatorrent/api/AffinityRule.java
@@ -92,7 +92,7 @@ public class AffinityRule implements Serializable
   public AffinityRule(Type type, Locality locality, boolean relaxLocality, String firstOperator, String... otherOperators)
   {
     this(type, locality, relaxLocality);
-    LinkedList<String> operators = new LinkedList<String>();
+    LinkedList<String> operators = new LinkedList<>();
     if (firstOperator != null && otherOperators.length >= 1) {
       operators.add(firstOperator);
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/api/src/main/java/com/datatorrent/api/Attribute.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Attribute.java b/api/src/main/java/com/datatorrent/api/Attribute.java
index 2efc84f..821ecb2 100644
--- a/api/src/main/java/com/datatorrent/api/Attribute.java
+++ b/api/src/main/java/com/datatorrent/api/Attribute.java
@@ -236,11 +236,11 @@ public class Attribute<T> implements Serializable
      */
     public static class AttributeInitializer
     {
-      static final HashMap<Class<?>, Set<Attribute<Object>>> map = new HashMap<Class<?>, Set<Attribute<Object>>>();
+      static final HashMap<Class<?>, Set<Attribute<Object>>> map = new HashMap<>();
 
       public static Map<Attribute<Object>, Object> getAllAttributes(Context context, Class<?> clazz)
       {
-        Map<Attribute<Object>, Object> result = new HashMap<Attribute<Object>, Object>();
+        Map<Attribute<Object>, Object> result = new HashMap<>();
         try {
           for (Field f: clazz.getDeclaredFields()) {
             if (Modifier.isStatic(f.getModifiers()) && Attribute.class.isAssignableFrom(f.getType())) {
@@ -273,7 +273,7 @@ public class Attribute<T> implements Serializable
         if (map.containsKey(clazz)) {
           return 0;
         }
-        Set<Attribute<Object>> set = new HashSet<Attribute<Object>>();
+        Set<Attribute<Object>> set = new HashSet<>();
         try {
           for (Field f: clazz.getDeclaredFields()) {
             if (Modifier.isStatic(f.getModifiers()) && Attribute.class.isAssignableFrom(f.getType())) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 3d3cffe..94022ff 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -121,17 +121,17 @@ public interface Context
     /**
      * Number of tuples the poll buffer can cache without blocking the input stream to the port.
      */
-    Attribute<Integer> QUEUE_CAPACITY = new Attribute<Integer>(1024);
+    Attribute<Integer> QUEUE_CAPACITY = new Attribute<>(1024);
     /**
      * The amount of buffer memory this port requires. There is a buffer server in each container. This is used to calculate total buffer server memory for container.
      * Also due to the nature of the application, if buffer server needs to use more RAM, from time to time, this number may
      * not be adhered to.
      */
-    Attribute<Integer> BUFFER_MEMORY_MB = new Attribute<Integer>(8 * 64);
+    Attribute<Integer> BUFFER_MEMORY_MB = new Attribute<>(8 * 64);
     /**
      * Poll period in milliseconds when the port buffer reaches its limits.
      */
-    Attribute<Integer> SPIN_MILLIS = new Attribute<Integer>(10);
+    Attribute<Integer> SPIN_MILLIS = new Attribute<>(10);
     /**
      * Input port attribute. Extend partitioning of an upstream operator w/o intermediate merge.
      * Can be used to form parallel partitions that span a group of operators.
@@ -139,7 +139,7 @@ public interface Context
      * If multiple ports of an operator have the setting, incoming streams must track back to
      * a common root partition, i.e. the operator join forks of the same origin.
      */
-    Attribute<Boolean> PARTITION_PARALLEL = new Attribute<Boolean>(false);
+    Attribute<Boolean> PARTITION_PARALLEL = new Attribute<>(false);
     /**
      * Attribute of output port to specify how many partitions should be merged by a single unifier instance. If the
      * number of partitions exceeds the limit set, a cascading unifier plan will be created. For example, 4 partitions
@@ -147,7 +147,7 @@ public interface Context
      * network I/O or other resource requirement for each unifier container (depends on the specific functionality of
      * the unifier), enabling horizontal scale by overcoming the single unifier bottleneck.
      */
-    Attribute<Integer> UNIFIER_LIMIT = new Attribute<Integer>(Integer.MAX_VALUE);
+    Attribute<Integer> UNIFIER_LIMIT = new Attribute<>(Integer.MAX_VALUE);
 
     /**
      * Attribute to specify that the final unifier be always a single unifier. This is useful when in MxN partitioning
@@ -158,16 +158,16 @@ public interface Context
      * the inputs. In this case the default unifier behavior can be specified on the output port and individual
      * exceptions can be specified on the corresponding input ports.
      */
-    Attribute<Boolean> UNIFIER_SINGLE_FINAL = new Attribute<Boolean>(Boolean.FALSE);
+    Attribute<Boolean> UNIFIER_SINGLE_FINAL = new Attribute<>(Boolean.FALSE);
     /**
      * Whether or not to auto record the tuples
      */
-    Attribute<Boolean> AUTO_RECORD = new Attribute<Boolean>(false);
+    Attribute<Boolean> AUTO_RECORD = new Attribute<>(false);
     /**
      * Whether the output is unified.
      * This is a read-only attribute to query that whether the output of the operator from multiple instances is being unified.
      */
-    Attribute<Boolean> IS_OUTPUT_UNIFIED = new Attribute<Boolean>(false);
+    Attribute<Boolean> IS_OUTPUT_UNIFIED = new Attribute<>(false);
     /**
      * Provide the codec which can be used to serialize or deserialize the data
      * that can be received on the port. If it is unspecified the engine may use
@@ -193,13 +193,13 @@ public interface Context
      * of the operator. On subsequent run, it's the windowId of the checkpoint from which the operator state
      * is recovered.
      */
-    Attribute<Long> ACTIVATION_WINDOW_ID = new Attribute<Long>(Stateless.WINDOW_ID);
+    Attribute<Long> ACTIVATION_WINDOW_ID = new Attribute<>(Stateless.WINDOW_ID);
     /**
      * It is a maximum poll period in milliseconds when there are no tuples available on any of the input ports of the
      * operator. Platform uses the heuristic to change poll period from 0 to SPIN_MILLIS seconds.
      * Default value is 10 milliseconds.
      */
-    Attribute<Integer> SPIN_MILLIS = new Attribute<Integer>(10);
+    Attribute<Integer> SPIN_MILLIS = new Attribute<>(10);
     /**
      * The maximum number of attempts to restart a failing operator before shutting down the application.
      * Until this number is reached, when an operator fails to start it is re-spawned in a new container. Once all the
@@ -218,15 +218,15 @@ public interface Context
      * by the engine. The attribute is ignored when the operator was already declared stateless through the
      * {@link Stateless} annotation.
      */
-    Attribute<Boolean> STATELESS = new Attribute<Boolean>(false);
+    Attribute<Boolean> STATELESS = new Attribute<>(false);
     /**
      * Memory resource that the operator requires for optimal functioning. Used to calculate total memory requirement for containers.
      */
-    Attribute<Integer> MEMORY_MB = new Attribute<Integer>(1024);
+    Attribute<Integer> MEMORY_MB = new Attribute<>(1024);
     /**
      * CPU Cores that the operator requires for optimal functioning. Used to calculate total CPU Cores requirement for containers.
      */
-    Attribute<Integer> VCORES = new Attribute<Integer>(0);
+    Attribute<Integer> VCORES = new Attribute<>(0);
 
     /**
      * The options to be pass to JVM when launching the operator. Options such as java maximum heap size can be specified here.
@@ -235,7 +235,7 @@ public interface Context
     /**
      * Attribute of the operator that tells the platform how many streaming windows make 1 application window.
      */
-    Attribute<Integer> APPLICATION_WINDOW_COUNT = new Attribute<Integer>(1);
+    Attribute<Integer> APPLICATION_WINDOW_COUNT = new Attribute<>(1);
     /**
      * When set it changes the computation to sliding window computation where duration is determined using {@link #APPLICATION_WINDOW_COUNT} that is
      * slided by duration determined using value of this attribute. Default value is null which is equivalent to that of {@link #APPLICATION_WINDOW_COUNT}.
@@ -251,7 +251,7 @@ public interface Context
      * value. Typically user would define this value to be the same as that of APPLICATION_WINDOW_COUNT so checkpointing
      * will be done at application window boundary.
      */
-    Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<Integer>(1);
+    Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<>(1);
     /**
      * Name of host to directly control locality of an operator. Complementary to stream locality (NODE_LOCAL affinity).
      * For example, the user may wish to specify a locality constraint for an input operator relative to its data source.
@@ -274,18 +274,18 @@ public interface Context
      * If the processing mode for an operator is specified as EXACTLY_ONCE then the processing mode for all downstream operators
      * should be specified as AT_MOST_ONCE otherwise it will result in an error.
      */
-    Attribute<Operator.ProcessingMode> PROCESSING_MODE = new Attribute<Operator.ProcessingMode>(ProcessingMode.AT_LEAST_ONCE);
+    Attribute<Operator.ProcessingMode> PROCESSING_MODE = new Attribute<>(ProcessingMode.AT_LEAST_ONCE);
     /**
      * Timeout to identify stalled processing, specified as count of streaming windows. If the last processed
      * window does not advance within the specified timeout count, the operator will be considered stuck and the
      * container restart. There are multiple reasons this could happen: clock drift, hardware issue, networking issue,
      * blocking operator logic, etc.
      */
-    Attribute<Integer> TIMEOUT_WINDOW_COUNT = new Attribute<Integer>(120);
+    Attribute<Integer> TIMEOUT_WINDOW_COUNT = new Attribute<>(120);
     /**
      * Whether or not to auto record the tuples
      */
-    Attribute<Boolean> AUTO_RECORD = new Attribute<Boolean>(false);
+    Attribute<Boolean> AUTO_RECORD = new Attribute<>(false);
     /**
      * How the operator distributes its state and share the input can be influenced by setting the Partitioner attribute.
      * If this attribute is set to non null value, the instance of the partitioner is used to partition and merge the
@@ -348,7 +348,7 @@ public interface Context
      * Name under which the application will be shown in the resource manager.
      * If not set, the default is the configuration Java class or property file name.
      */
-    Attribute<String> APPLICATION_NAME = new Attribute<String>("unknown-application-name");
+    Attribute<String> APPLICATION_NAME = new Attribute<>("unknown-application-name");
     /**
      * URL to the application's documentation.
      */
@@ -387,7 +387,7 @@ public interface Context
     /**
      * Dump extra debug information in launcher, master and containers.
      */
-    Attribute<Boolean> DEBUG = new Attribute<Boolean>(false);
+    Attribute<Boolean> DEBUG = new Attribute<>(false);
     /**
      * The options to be pass to JVM when launching the containers. Options such as java maximum heap size can be specified here.
      */
@@ -396,20 +396,20 @@ public interface Context
      * The amount of memory to be requested for the application master. Not used in local mode.
      * Default value is 1GB.
      */
-    Attribute<Integer> MASTER_MEMORY_MB = new Attribute<Integer>(1024);
+    Attribute<Integer> MASTER_MEMORY_MB = new Attribute<>(1024);
     /**
      * Where to spool the data once the buffer server capacity is reached.
      */
-    Attribute<Boolean> BUFFER_SPOOLING = new Attribute<Boolean>(true);
+    Attribute<Boolean> BUFFER_SPOOLING = new Attribute<>(true);
     /**
      * The streaming window size to use for the application. It is specified in milliseconds. Default value is 500ms.
      */
-    Attribute<Integer> STREAMING_WINDOW_SIZE_MILLIS = new Attribute<Integer>(500);
+    Attribute<Integer> STREAMING_WINDOW_SIZE_MILLIS = new Attribute<>(500);
     /**
      * The time interval for saving the operator state. It is specified as a multiple of streaming windows. The operator
      * state is saved periodically with interval equal to the checkpoint interval. Default value is 60 streaming windows.
      */
-    Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<Integer>(60);
+    Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<>(60);
     /**
      * The path to store application dependencies, recording and other generated files for application master and containers.
      */
@@ -418,13 +418,13 @@ public interface Context
      * The size limit for a file where tuple recordings are stored. When tuples are being recorded they are stored
      * in files. When a file size reaches this limit a new file is created and tuples start getting stored in the new file. Default value is 128k.
      */
-    Attribute<Integer> TUPLE_RECORDING_PART_FILE_SIZE = new Attribute<Integer>(128 * 1024);
+    Attribute<Integer> TUPLE_RECORDING_PART_FILE_SIZE = new Attribute<>(128 * 1024);
     /**
      * The time limit for a file where tuple recordings are stored. When tuples are being recorded they are stored
      * in files. When a tuple recording file creation time falls beyond the time limit window from the current time a new file
      * is created and the tuples start getting stored in the new file. Default value is 30hrs.
      */
-    Attribute<Integer> TUPLE_RECORDING_PART_FILE_TIME_MILLIS = new Attribute<Integer>(30 * 60 * 60 * 1000);
+    Attribute<Integer> TUPLE_RECORDING_PART_FILE_TIME_MILLIS = new Attribute<>(30 * 60 * 60 * 1000);
     /**
      * Address to which the application side connects to DT Gateway, in the form of host:port. This will override "dt.gateway.listenAddress" in the configuration.
      */
@@ -432,7 +432,7 @@ public interface Context
     /**
      * Whether or not gateway is expecting SSL connection.
      */
-    Attribute<Boolean> GATEWAY_USE_SSL = new Attribute<Boolean>(false);
+    Attribute<Boolean> GATEWAY_USE_SSL = new Attribute<>(false);
     /**
      * The username for logging in to the gateway, if authentication is enabled.
      */
@@ -448,48 +448,48 @@ public interface Context
     /**
      * Maximum number of simultaneous heartbeat connections to process. Default value is 30.
      */
-    Attribute<Integer> HEARTBEAT_LISTENER_THREAD_COUNT = new Attribute<Integer>(30);
+    Attribute<Integer> HEARTBEAT_LISTENER_THREAD_COUNT = new Attribute<>(30);
     /**
      * How frequently should operators heartbeat to stram. Recommended setting is
      * 1000ms. Value 0 will disable heartbeat (for unit testing). Default value is 1000ms.
      */
-    Attribute<Integer> HEARTBEAT_INTERVAL_MILLIS = new Attribute<Integer>(1000);
+    Attribute<Integer> HEARTBEAT_INTERVAL_MILLIS = new Attribute<>(1000);
     /**
      * Timeout for master to identify a hung container (full GC etc.). Timeout will result in container restart.
      * Default value is 30s.
      */
-    Attribute<Integer> HEARTBEAT_TIMEOUT_MILLIS = new Attribute<Integer>(30 * 1000);
+    Attribute<Integer> HEARTBEAT_TIMEOUT_MILLIS = new Attribute<>(30 * 1000);
     /**
      * Timeout for allocating container resources. Default value is 60s.
      */
-    Attribute<Integer> RESOURCE_ALLOCATION_TIMEOUT_MILLIS = new Attribute<Integer>(Integer.MAX_VALUE);
+    Attribute<Integer> RESOURCE_ALLOCATION_TIMEOUT_MILLIS = new Attribute<>(Integer.MAX_VALUE);
     /**
      * Maximum number of windows that can be pending for statistics calculation. Statistics are computed when
      * the metrics are available from all operators for a window. If the information is not available from all operators then
      * the window is pending. When the number of pending windows reaches this limit the information for the oldest window
      * is purged. Default value is 1000 windows.
      */
-    Attribute<Integer> STATS_MAX_ALLOWABLE_WINDOWS_LAG = new Attribute<Integer>(1000);
+    Attribute<Integer> STATS_MAX_ALLOWABLE_WINDOWS_LAG = new Attribute<>(1000);
     /**
      * Whether or not we record statistics. The statistics are recorded for each heartbeat if enabled. The default value is false.
      */
-    Attribute<Boolean> ENABLE_STATS_RECORDING = new Attribute<Boolean>(false);
+    Attribute<Boolean> ENABLE_STATS_RECORDING = new Attribute<>(false);
     /**
      * The time interval for throughput calculation. The throughput is periodically calculated with interval greater than or
      * equal to the throughput calculation interval. The default value is 10s.
      */
-    Attribute<Integer> THROUGHPUT_CALCULATION_INTERVAL = new Attribute<Integer>(10000);
+    Attribute<Integer> THROUGHPUT_CALCULATION_INTERVAL = new Attribute<>(10000);
     /**
      * The maximum number of samples to use when calculating throughput. In practice fewer samples may be used
      * if the THROUGHPUT_CALCULATION_INTERVAL is exceeded. Default value is 1000 samples.
      */
-    Attribute<Integer> THROUGHPUT_CALCULATION_MAX_SAMPLES = new Attribute<Integer>(1000);
+    Attribute<Integer> THROUGHPUT_CALCULATION_MAX_SAMPLES = new Attribute<>(1000);
     /**
      * The number of samples to use when using RPC latency to compensate for clock skews and network latency when
      * calculating stats. Specify 0 if RPC latency should not be used at all to calculate stats. Default value is 100
      * samples.
      */
-    Attribute<Integer> RPC_LATENCY_COMPENSATION_SAMPLES = new Attribute<Integer>(100);
+    Attribute<Integer> RPC_LATENCY_COMPENSATION_SAMPLES = new Attribute<>(100);
     /**
      * The agent which can be used to find the jvm options for the container.
      */
@@ -511,12 +511,12 @@ public interface Context
      * blacklisting of nodes by application master
      * Blacklisting for nodes is disabled for the default value
      */
-    Attribute<Integer> MAX_CONSECUTIVE_CONTAINER_FAILURES_FOR_BLACKLIST = new Attribute<Integer>(Integer.MAX_VALUE);
+    Attribute<Integer> MAX_CONSECUTIVE_CONTAINER_FAILURES_FOR_BLACKLIST = new Attribute<>(Integer.MAX_VALUE);
 
     /**
      * The amount of time to wait before removing failed nodes from blacklist
      */
-    Attribute<Long> BLACKLISTED_NODE_REMOVAL_TIME_MILLIS = new Attribute<Long>(new Long(60 * 60 * 1000));
+    Attribute<Long> BLACKLISTED_NODE_REMOVAL_TIME_MILLIS = new Attribute<>(new Long(60 * 60 * 1000));
 
     /**
      * Affinity rules for specifying affinity and anti-affinity between logical operators

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/api/src/main/java/com/datatorrent/api/StringCodec.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/StringCodec.java b/api/src/main/java/com/datatorrent/api/StringCodec.java
index d4a0a41..fa8ab23 100644
--- a/api/src/main/java/com/datatorrent/api/StringCodec.java
+++ b/api/src/main/java/com/datatorrent/api/StringCodec.java
@@ -302,7 +302,7 @@ public interface StringCodec<T>
           return clazz.getConstructor(String.class).newInstance(parts[1]);
         } else {
           T object = clazz.getConstructor(String.class).newInstance(parts[1]);
-          HashMap<String, String> hashMap = new HashMap<String, String>();
+          HashMap<String, String> hashMap = new HashMap<>();
           for (int i = 2; i < parts.length; i++) {
             String[] keyValPair = parts[i].split(propertySeparator, 2);
             hashMap.put(keyValPair[0], keyValPair[1]);
@@ -365,11 +365,11 @@ public interface StringCodec<T>
       }
 
       if (string.isEmpty()) {
-        return new HashMap<K, V>();
+        return new HashMap<>();
       }
 
       String[] parts = string.split(separator);
-      HashMap<K, V> map = new HashMap<K, V>();
+      HashMap<K, V> map = new HashMap<>();
       for (String part : parts) {
         String[] kvpair = part.split(equal, 2);
         map.put(keyCodec.fromString(kvpair[0]), valueCodec.fromString(kvpair[1]));
@@ -433,7 +433,7 @@ public interface StringCodec<T>
       }
 
       String[] parts = string.split(separator);
-      ArrayList<T> arrayList = new ArrayList<T>(parts.length);
+      ArrayList<T> arrayList = new ArrayList<>(parts.length);
       for (String part : parts) {
         arrayList.add(codec.fromString(part));
       }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java b/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
index 82cf50e..8ff0205 100644
--- a/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
+++ b/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
@@ -36,17 +36,17 @@ public abstract class YarnAppLauncher<H extends YarnAppLauncher.YarnAppHandle> e
   /**
    * Parameter to specify extra jars for launch.
    */
-  public static final Attribute<String> LIB_JARS = new Attribute<String>(new StringCodec.String2String());
+  public static final Attribute<String> LIB_JARS = new Attribute<>(new StringCodec.String2String());
 
   /**
    * Parameter to specify the previous application id to use to resume launch from.
    */
-  public static final Attribute<String> ORIGINAL_APP_ID = new Attribute<String>(new StringCodec.String2String());
+  public static final Attribute<String> ORIGINAL_APP_ID = new Attribute<>(new StringCodec.String2String());
 
   /**
    * Parameter to specify the queue name to use for launch.
    */
-  public static final Attribute<String> QUEUE_NAME = new Attribute<String>(new StringCodec.String2String());
+  public static final Attribute<String> QUEUE_NAME = new Attribute<>(new StringCodec.String2String());
 
   static {
     Attribute.AttributeMap.AttributeInitializer.initialize(YarnAppLauncher.class);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/api/src/test/java/com/datatorrent/api/AttributeMapTest.java
----------------------------------------------------------------------
diff --git a/api/src/test/java/com/datatorrent/api/AttributeMapTest.java b/api/src/test/java/com/datatorrent/api/AttributeMapTest.java
index fcb1809..b463619 100644
--- a/api/src/test/java/com/datatorrent/api/AttributeMapTest.java
+++ b/api/src/test/java/com/datatorrent/api/AttributeMapTest.java
@@ -51,7 +51,7 @@ public class AttributeMapTest
 
   interface iface
   {
-    Attribute<Greeting> greeting = new Attribute<Greeting>(Greeting.hello);
+    Attribute<Greeting> greeting = new Attribute<>(Greeting.hello);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index 84999fa..d08b9fc 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -336,14 +336,14 @@ public class DataList
   {
     all_listeners.add(dl);
     //logger.debug("total {} listeners {} -> {}", all_listeners.size(), dl, this);
-    ArrayList<BitVector> partitions = new ArrayList<BitVector>();
+    ArrayList<BitVector> partitions = new ArrayList<>();
     if (dl.getPartitions(partitions) > 0) {
       for (BitVector partition : partitions) {
         HashSet<DataListener> set;
         if (listeners.containsKey(partition)) {
           set = listeners.get(partition);
         } else {
-          set = new HashSet<DataListener>();
+          set = new HashSet<>();
           listeners.put(partition, set);
         }
         set.add(dl);
@@ -353,7 +353,7 @@ public class DataList
       if (listeners.containsKey(DataListener.NULL_PARTITION)) {
         set = listeners.get(DataListener.NULL_PARTITION);
       } else {
-        set = new HashSet<DataListener>();
+        set = new HashSet<>();
         listeners.put(DataListener.NULL_PARTITION, set);
       }
 
@@ -363,7 +363,7 @@ public class DataList
 
   public void removeDataListener(DataListener dl)
   {
-    ArrayList<BitVector> partitions = new ArrayList<BitVector>();
+    ArrayList<BitVector> partitions = new ArrayList<>();
     if (dl.getPartitions(partitions) > 0) {
       for (BitVector partition : partitions) {
         if (listeners.containsKey(partition)) {
@@ -459,7 +459,7 @@ public class DataList
 
     // When the number of subscribers becomes high or the number of blocks becomes high, consider optimize it.
     Block b = first;
-    Map<Block, Integer> indices = new HashMap<Block, Integer>();
+    Map<Block, Integer> indices = new HashMap<>();
     int i = 0;
     while (b != null) {
       indices.put(b, i++);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index 08a483a..b06e60a 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -71,8 +71,8 @@ public class LogicalNode implements DataListener
     this.identifier = identifier;
     this.upstream = upstream;
     this.group = group;
-    this.physicalNodes = new HashSet<PhysicalNode>();
-    this.partitions = new HashSet<BitVector>();
+    this.physicalNodes = new HashSet<>();
+    this.partitions = new HashSet<>();
     this.iterator = iterator;
     this.skipWindowId = skipWindowId;
     this.eventloop = eventloop;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 7ac518b..8a56b51 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -260,7 +260,7 @@ public class Server extends AbstractServer
   }
 
   private final ConcurrentHashMap<String, DataList> publisherBuffers = new ConcurrentHashMap<>(1, 0.75f, 1);
-  private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<String, LogicalNode>();
+  private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<>();
   private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels = new ConcurrentHashMap<>();
   private final int blockSize;
   private final int numberOfCacheBlocks;
@@ -883,7 +883,7 @@ public class Server extends AbstractServer
         }
       }
 
-      ArrayList<LogicalNode> list = new ArrayList<LogicalNode>();
+      ArrayList<LogicalNode> list = new ArrayList<>();
       String publisherIdentifier = datalist.getIdentifier();
       Iterator<LogicalNode> iterator = subscriberGroups.values().iterator();
       while (iterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
index 124cc5f..000ce00 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
@@ -31,7 +31,7 @@ import com.datatorrent.netlet.EventLoop;
  */
 public class System
 {
-  private static final HashMap<String, DefaultEventLoop> eventloops = new HashMap<String, DefaultEventLoop>();
+  private static final HashMap<String, DefaultEventLoop> eventloops = new HashMap<>();
 
   public static void startup(String identifier)
   {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
index f7b8829..371f98a 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
@@ -42,7 +42,7 @@ public class SubscribeRequestTupleTest
     String down_type = "SubscriberId/StreamType";
     String upstream_id = "PublisherId";
     int mask = 7;
-    ArrayList<Integer> partitions = new ArrayList<Integer>();
+    ArrayList<Integer> partitions = new ArrayList<>();
     partitions.add(5);
     long startingWindowId = 0xcafebabe00000078L;
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
index 3c0cb0e..d6e9b1a 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
@@ -32,7 +32,7 @@ import com.datatorrent.bufferserver.packet.Tuple;
  */
 public class Subscriber extends com.datatorrent.bufferserver.client.Subscriber
 {
-  public final ArrayList<Object> resetPayloads = new ArrayList<Object>();
+  public final ArrayList<Object> resetPayloads = new ArrayList<>();
   public AtomicInteger tupleCount = new AtomicInteger(0);
   public WindowIdHolder firstPayload;
   public WindowIdHolder lastPayload;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java b/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
index 165d8cf..d77b1ae 100644
--- a/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
+++ b/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
@@ -114,7 +114,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
       newPartitions = Lists.newArrayList();
 
       for (int partitionCounter = 0; partitionCounter < newPartitionCount; partitionCounter++) {
-        newPartitions.add(new DefaultPartition<T>(partition.getPartitionedInstance()));
+        newPartitions.add(new DefaultPartition<>(partition.getPartitionedInstance()));
       }
 
       // partition the stream that was first connected in the DAG and send full data to remaining input ports
@@ -156,8 +156,8 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
    */
   public static <T extends Operator> Collection<Partition<T>> repartition(Collection<Partition<T>> partitions)
   {
-    List<Partition<T>> newPartitions = new ArrayList<Partition<T>>();
-    HashMap<Integer, Partition<T>> lowLoadPartitions = new HashMap<Integer, Partition<T>>();
+    List<Partition<T>> newPartitions = new ArrayList<>();
+    HashMap<Integer, Partition<T>> lowLoadPartitions = new HashMap<>();
     for (Partition<T> p: partitions) {
       int load = p.getLoad();
       if (load < 0) {
@@ -201,7 +201,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
         }
 
         for (int key: newKeys) {
-          Partition<T> newPartition = new DefaultPartition<T>(p.getPartitionedInstance());
+          Partition<T> newPartition = new DefaultPartition<>(p.getPartitionedInstance());
           newPartition.getPartitionKeys().put(e.getKey(), new PartitionKeys(newMask, Sets.newHashSet(key)));
           newPartitions.add(newPartition);
         }
@@ -224,8 +224,8 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
    */
   public static <T extends Operator> Collection<Partition<T>> repartitionInputOperator(Collection<Partition<T>> partitions)
   {
-    List<Partition<T>> newPartitions = new ArrayList<Partition<T>>();
-    List<Partition<T>> lowLoadPartitions = new ArrayList<Partition<T>>();
+    List<Partition<T>> newPartitions = new ArrayList<>();
+    List<Partition<T>> lowLoadPartitions = new ArrayList<>();
     for (Partition<T> p: partitions) {
       int load = p.getLoad();
       if (load < 0) {
@@ -235,8 +235,8 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
           lowLoadPartitions.add(p);
         }
       } else if (load > 0) {
-        newPartitions.add(new DefaultPartition<T>(p.getPartitionedInstance()));
-        newPartitions.add(new DefaultPartition<T>(p.getPartitionedInstance()));
+        newPartitions.add(new DefaultPartition<>(p.getPartitionedInstance()));
+        newPartitions.add(new DefaultPartition<>(p.getPartitionedInstance()));
       } else {
         newPartitions.add(p);
       }
@@ -274,7 +274,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
       T anOperator = newPartitions.iterator().next().getPartitionedInstance();
 
       while (morePartitionsToCreate-- > 0) {
-        DefaultPartition<T> partition = new DefaultPartition<T>(anOperator);
+        DefaultPartition<T> partition = new DefaultPartition<>(anOperator);
         newPartitions.add(partition);
       }
     }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/main/java/com/datatorrent/common/security/SecurityContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/security/SecurityContext.java b/common/src/main/java/com/datatorrent/common/security/SecurityContext.java
index dccd7b7..3dc4dda 100644
--- a/common/src/main/java/com/datatorrent/common/security/SecurityContext.java
+++ b/common/src/main/java/com/datatorrent/common/security/SecurityContext.java
@@ -32,17 +32,17 @@ public interface SecurityContext extends Context
   /**
    * Attribute for the user name for login.
    */
-  Attribute<String> USER_NAME = new Attribute<String>((String)null);
+  Attribute<String> USER_NAME = new Attribute<>((String)null);
 
   /**
    * Attribute for the password for login.
    */
 
-  Attribute<char[]> PASSWORD = new Attribute<char[]>((char[])null);
+  Attribute<char[]> PASSWORD = new Attribute<>((char[])null);
 
   /**
    * Attribute for the realm for login.
    */
-  Attribute<String> REALM = new Attribute<String>((String)null);
+  Attribute<String> REALM = new Attribute<>((String)null);
 
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
index ca7490d..f90a888 100644
--- a/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
+++ b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
@@ -48,7 +48,7 @@ public class DefaultDelayOperator<T> extends BaseOperator implements Operator.De
     }
   };
 
-  public transient DefaultOutputPort<T> output = new DefaultOutputPort<T>();
+  public transient DefaultOutputPort<T> output = new DefaultOutputPort<>();
 
   protected List<T> lastWindowTuples = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java b/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
index 7723fed..ef837a8 100644
--- a/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
+++ b/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
@@ -55,9 +55,9 @@ public class JacksonObjectMapperProvider implements ContextResolver<ObjectMapper
     this.objectMapper = new ObjectMapper();
     objectMapper.configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, true);
     objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false);
-    module.addSerializer(ObjectMapperString.class, new RawSerializer<Object>(Object.class));
-    module.addSerializer(JSONObject.class, new RawSerializer<Object>(Object.class));
-    module.addSerializer(JSONArray.class, new RawSerializer<Object>(Object.class));
+    module.addSerializer(ObjectMapperString.class, new RawSerializer<>(Object.class));
+    module.addSerializer(JSONObject.class, new RawSerializer<>(Object.class));
+    module.addSerializer(JSONArray.class, new RawSerializer<>(Object.class));
     objectMapper.registerModule(module);
   }
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java b/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
index 63d1646..af5e10e 100644
--- a/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
+++ b/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
@@ -53,7 +53,7 @@ public class PubSubMessageCodec<T>
    */
   public static <T> String constructPublishMessage(String topic, T data, PubSubMessageCodec<T> codec) throws IOException
   {
-    PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+    PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
     pubSubMessage.setType(PubSubMessageType.PUBLISH);
     pubSubMessage.setTopic(topic);
     pubSubMessage.setData(data);
@@ -72,7 +72,7 @@ public class PubSubMessageCodec<T>
    */
   public static <T> String constructSubscribeMessage(String topic, PubSubMessageCodec<T> codec) throws IOException
   {
-    PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+    PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
     pubSubMessage.setType(PubSubMessageType.SUBSCRIBE);
     pubSubMessage.setTopic(topic);
 
@@ -90,7 +90,7 @@ public class PubSubMessageCodec<T>
    */
   public static <T> String constructUnsubscribeMessage(String topic, PubSubMessageCodec<T> codec) throws IOException
   {
-    PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+    PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
     pubSubMessage.setType(PubSubMessageType.UNSUBSCRIBE);
     pubSubMessage.setTopic(topic);
 
@@ -108,7 +108,7 @@ public class PubSubMessageCodec<T>
    */
   public static <T> String constructSubscribeNumSubscribersMessage(String topic, PubSubMessageCodec<T> codec) throws IOException
   {
-    PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+    PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
     pubSubMessage.setType(PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS);
     pubSubMessage.setTopic(topic);
 
@@ -126,7 +126,7 @@ public class PubSubMessageCodec<T>
    */
   public static <T> String constructUnsubscribeNumSubscribersMessage(String topic, PubSubMessageCodec<T> codec) throws IOException
   {
-    PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+    PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
     pubSubMessage.setType(PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS);
     pubSubMessage.setTopic(topic);
 
@@ -135,7 +135,7 @@ public class PubSubMessageCodec<T>
 
   public String formatMessage(PubSubMessage<T> pubSubMessage) throws IOException
   {
-    HashMap<String, Object> map = new HashMap<String, Object>();
+    HashMap<String, Object> map = new HashMap<>();
     map.put(PubSubMessage.TYPE_KEY, pubSubMessage.getType().getIdentifier());
     map.put(PubSubMessage.TOPIC_KEY, pubSubMessage.getTopic());
     T data = pubSubMessage.getData();
@@ -156,7 +156,7 @@ public class PubSubMessageCodec<T>
   public PubSubMessage<T> parseMessage(String message) throws IOException
   {
     HashMap<String, Object> map = mapper.readValue(message, HashMap.class);
-    PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+    PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
     pubSubMessage.setType(PubSubMessageType.getPubSubMessageType((String)map.get(PubSubMessage.TYPE_KEY)));
     pubSubMessage.setTopic((String)map.get(PubSubMessage.TOPIC_KEY));
     pubSubMessage.setData((T)map.get(PubSubMessage.DATA_KEY));

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
index e7c4887..2e48f54 100644
--- a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
+++ b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
@@ -41,7 +41,7 @@ public class StatelessPartitionerTest
 
   public static class DummyOperator implements Operator
   {
-    public final DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+    public final DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
 
     private Integer value;
 
@@ -93,10 +93,10 @@ public class StatelessPartitionerTest
   public void partition1Test()
   {
     DummyOperator dummyOperator = new DummyOperator(5);
-    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>();
+    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>();
 
     Collection<Partition<DummyOperator>> partitions = Lists.newArrayList();
-    DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<DummyOperator>(dummyOperator);
+    DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<>(dummyOperator);
     partitions.add(defaultPartition);
 
     Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0));
@@ -111,10 +111,10 @@ public class StatelessPartitionerTest
   public void partition5Test()
   {
     DummyOperator dummyOperator = new DummyOperator(5);
-    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>(5);
+    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>(5);
 
     Collection<Partition<DummyOperator>> partitions = Lists.newArrayList();
-    DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<DummyOperator>(dummyOperator);
+    DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<>(dummyOperator);
     partitions.add(defaultPartition);
 
     Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0));
@@ -137,10 +137,10 @@ public class StatelessPartitionerTest
   public void testParallelPartitionScaleUP()
   {
     DummyOperator dummyOperator = new DummyOperator(5);
-    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>();
+    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>();
 
     Collection<Partition<DummyOperator>> partitions = Lists.newArrayList();
-    partitions.add(new DefaultPartition<DummyOperator>(dummyOperator));
+    partitions.add(new DefaultPartition<>(dummyOperator));
 
     Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions,
         new PartitioningContextImpl(null, 5));
@@ -151,12 +151,12 @@ public class StatelessPartitionerTest
   public void testParallelPartitionScaleDown()
   {
     DummyOperator dummyOperator = new DummyOperator(5);
-    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>();
+    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>();
 
     Collection<Partition<DummyOperator>> partitions = Lists.newArrayList();
 
     for (int i = 5; i-- > 0; ) {
-      partitions.add(new DefaultPartition<DummyOperator>(dummyOperator));
+      partitions.add(new DefaultPartition<>(dummyOperator));
     }
 
     Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions,

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java b/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
index 97debe3..79fdac7 100644
--- a/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
+++ b/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
@@ -50,7 +50,7 @@ public class SerializableObjectTest
       }
 
     };
-    public final transient OutputPort<T> output = new DefaultOutputPort<T>();
+    public final transient OutputPort<T> output = new DefaultOutputPort<>();
     private int i;
 
     public void setI(int i)
@@ -109,7 +109,7 @@ public class SerializableObjectTest
   @Test
   public void testReadResolve() throws Exception
   {
-    SerializableOperator<Object> pre = new SerializableOperator<Object>();
+    SerializableOperator<Object> pre = new SerializableOperator<>();
     pre.setI(10);
 
     FileOutputStream fos = new FileOutputStream(filename);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
index 53d91a5..80314c7 100644
--- a/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
+++ b/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
@@ -73,7 +73,7 @@ public class BlacklistBasedResourceRequestHandler extends ResourceRequestHandler
       for (ContainerRequest cr : requests) {
         ContainerStartRequest csr = hostSpecificRequests.get(cr);
         ContainerRequest newCr = new ContainerRequest(cr.getCapability(), null, null, cr.getPriority());
-        MutablePair<Integer, ContainerRequest> pair = new MutablePair<Integer, ContainerRequest>(loopCounter, newCr);
+        MutablePair<Integer, ContainerRequest> pair = new MutablePair<>(loopCounter, newCr);
         requestedResources.put(csr, pair);
         containerRequests.add(newCr);
         hostSpecificRequests.remove(cr);
@@ -91,7 +91,7 @@ public class BlacklistBasedResourceRequestHandler extends ResourceRequestHandler
         for (Entry<ContainerRequest, ContainerStartRequest> entry : otherContainerRequests.entrySet()) {
           ContainerRequest cr = entry.getKey();
           ContainerStartRequest csr = entry.getValue();
-          MutablePair<Integer, ContainerRequest> pair = new MutablePair<Integer, ContainerRequest>(loopCounter, cr);
+          MutablePair<Integer, ContainerRequest> pair = new MutablePair<>(loopCounter, cr);
           requestedResources.put(csr, pair);
           containerRequests.add(cr);
         }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
index e7f9672..45206bc 100644
--- a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
+++ b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
@@ -102,7 +102,7 @@ public class ResourceRequestHandler
    */
   public void addContainerRequest(Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, List<ContainerRequest> containerRequests, StreamingContainerAgent.ContainerStartRequest csr, ContainerRequest cr)
   {
-    MutablePair<Integer, ContainerRequest> pair = new MutablePair<Integer, ContainerRequest>(loopCounter, cr);
+    MutablePair<Integer, ContainerRequest> pair = new MutablePair<>(loopCounter, cr);
     requestedResources.put(csr, pair);
     containerRequests.add(cr);
   }
@@ -164,7 +164,7 @@ public class ResourceRequestHandler
 
   public List<String> getNodesExceptHost(List<String> hostNames)
   {
-    List<String> nodesList = new ArrayList<String>();
+    List<String> nodesList = new ArrayList<>();
     Set<String> hostNameSet = Sets.newHashSet();
     hostNameSet.addAll(hostNames);
     for (String host : nodeReportMap.keySet()) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java b/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java
index 3a61ee6..b374447 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java
@@ -32,10 +32,10 @@ import org.codehaus.jettison.json.JSONObject;
 public class PermissionsInfo
 {
 
-  private final Set<String> readOnlyRoles = new TreeSet<String>();
-  private final Set<String> readOnlyUsers = new TreeSet<String>();
-  private final Set<String> readWriteRoles = new TreeSet<String>();
-  private final Set<String> readWriteUsers = new TreeSet<String>();
+  private final Set<String> readOnlyRoles = new TreeSet<>();
+  private final Set<String> readOnlyUsers = new TreeSet<>();
+  private final Set<String> readWriteRoles = new TreeSet<>();
+  private final Set<String> readWriteUsers = new TreeSet<>();
   private boolean readOnlyEveryone = false;
   private boolean readWriteEveryone = false;
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
index 050729d..8076d4a 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
@@ -271,7 +271,7 @@ public class StramClientUtils
       }
       Text rmTokenService = new Text(Joiner.on(',').join(services));
 
-      return new Token<RMDelegationTokenIdentifier>(
+      return new Token<>(
           rmDelegationToken.getIdentifier().array(),
           rmDelegationToken.getPassword().array(),
           new Text(rmDelegationToken.getKind()),

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
index 7113280..284aefb 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
@@ -50,8 +50,8 @@ public class OperatorContext extends BaseContext implements Context.OperatorCont
   private final int id;
   private final String name;
   // the size of the circular queue should be configurable. hardcoded to 1024 for now.
-  private final CircularBuffer<ContainerStats.OperatorStats> statsBuffer = new CircularBuffer<ContainerStats.OperatorStats>(1024);
-  private final CircularBuffer<OperatorRequest> requests = new CircularBuffer<OperatorRequest>(1024);
+  private final CircularBuffer<ContainerStats.OperatorStats> statsBuffer = new CircularBuffer<>(1024);
+  private final CircularBuffer<OperatorRequest> requests = new CircularBuffer<>(1024);
   public final boolean stateless;
   private int windowsFromCheckpoint;
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 401eea9..62c4fd8 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -760,7 +760,7 @@ public class LogicalPlan implements Serializable, DAG
         codecs.put(sinkToPersistPortMeta, inputStreamCodec);
         InputPortMeta persistOperatorPortMeta = assertGetPortMeta(port);
         StreamCodec<Object> specifiedCodecForPersistOperator = (StreamCodec<Object>)persistOperatorPortMeta.getStreamCodec();
-        StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<Object>(codecs, specifiedCodecForPersistOperator);
+        StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<>(codecs, specifiedCodecForPersistOperator);
         persistOperatorPortMeta.setStreamCodec(codec);
       }
     }
@@ -1907,14 +1907,14 @@ public class LogicalPlan implements Serializable, DAG
     HashMap<OperatorPair, AffinityRule> antiAffinities = new HashMap<>();
     HashMap<OperatorPair, AffinityRule> threadLocalAffinities = new HashMap<>();
 
-    List<String> operatorNames = new ArrayList<String>();
+    List<String> operatorNames = new ArrayList<>();
 
     for (OperatorMeta operator : getAllOperators()) {
       operatorNames.add(operator.getName());
-      Set<String> containerSet = new HashSet<String>();
+      Set<String> containerSet = new HashSet<>();
       containerSet.add(operator.getName());
       containerAffinities.put(operator.getName(), containerSet);
-      Set<String> nodeSet = new HashSet<String>();
+      Set<String> nodeSet = new HashSet<>();
       nodeSet.add(operator.getName());
       nodeAffinities.put(operator.getName(), nodeSet);
 
@@ -2073,7 +2073,7 @@ public class LogicalPlan implements Serializable, DAG
    */
   public void convertRegexToList(List<String> operatorNames, AffinityRule rule)
   {
-    List<String> operators = new LinkedList<String>();
+    List<String> operators = new LinkedList<>();
     Pattern p = Pattern.compile(rule.getOperatorRegex());
     for (String name : operatorNames) {
       if (p.matcher(name).matches()) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index ce22bfd..a1da94a 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -477,13 +477,13 @@ public class PhysicalPlan implements Serializable
     // Log container anti-affinity
     if (LOG.isDebugEnabled()) {
       for (PTContainer container : containers) {
-        List<String> antiOperators = new ArrayList<String>();
+        List<String> antiOperators = new ArrayList<>();
         for (PTContainer c : container.getStrictAntiPrefs()) {
           for (PTOperator operator : c.getOperators()) {
             antiOperators.add(operator.getName());
           }
         }
-        List<String> containerOperators = new ArrayList<String>();
+        List<String> containerOperators = new ArrayList<>();
         for (PTOperator operator : container.getOperators()) {
           containerOperators.add(operator.getName());
         }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
index 38c0f41..47986cb 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
@@ -116,10 +116,10 @@ public abstract class PubSubWebSocketClient implements Component<Context>
    */
   public PubSubWebSocketClient()
   {
-    throwable = new AtomicReference<Throwable>();
+    throwable = new AtomicReference<>();
     ioThreadMultiplier = 1;
     mapper = (new JacksonObjectMapperProvider()).getContext(null);
-    codec = new PubSubMessageCodec<Object>(mapper);
+    codec = new PubSubMessageCodec<>(mapper);
 
     AsyncHttpClientConfigBean config = new AsyncHttpClientConfigBean();
     config.setIoThreadMultiplier(ioThreadMultiplier);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java b/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java
index dd75857..9fbb54d 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java
@@ -148,7 +148,7 @@ public class CompactUtil
     List<CompactAnnotationNode> annotations = new LinkedList<>();
     for (Object visibleAnnotation : fn.visibleAnnotations) {
       CompactAnnotationNode node = new CompactAnnotationNode();
-      Map<String, Object> annotationMap = new HashMap<String, Object>();
+      Map<String, Object> annotationMap = new HashMap<>();
       if (visibleAnnotation instanceof AnnotationNode) {
         AnnotationNode annotation = (AnnotationNode)visibleAnnotation;
         if (annotation.desc.contains("InputPortFieldAnnotation")

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java b/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java
index 1e87b31..91a3cf3 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java
@@ -87,7 +87,7 @@ public interface Type
 
     char boundChar;
 
-    ArrayList<Type> bounds = new ArrayList<Type>();
+    ArrayList<Type> bounds = new ArrayList<>();
 
     public Type[] getUpperBounds()
     {
@@ -154,7 +154,7 @@ public interface Type
   class ParameterizedTypeNode extends TypeNode
   {
 
-    ArrayList<Type> actualTypeArguments = new ArrayList<Type>();
+    ArrayList<Type> actualTypeArguments = new ArrayList<>();
 
     public Type[] getActualTypeArguments()
     {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index d7f96d4..0c997ec 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -78,7 +78,7 @@ public class CheckpointTest
   private static class MockInputOperator extends BaseOperator implements InputOperator, Operator.CheckpointNotificationListener
   {
     @OutputPortFieldAnnotation( optional = true)
-    public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<>();
     private transient int windowCount;
 
     private int checkpointState;
@@ -326,7 +326,7 @@ public class CheckpointTest
 
   public List<Checkpoint> getCheckpoints(Long... windowIds)
   {
-    List<Checkpoint> list = new ArrayList<Checkpoint>(windowIds.length);
+    List<Checkpoint> list = new ArrayList<>(windowIds.length);
     for (Long windowId : windowIds) {
       list.add(new Checkpoint(windowId, 0, 0));
     }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java b/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java
index b1509e0..b1f3363 100644
--- a/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java
@@ -41,7 +41,7 @@ public class GenericOperatorPropertyCodecTest
   public void testGenericOperatorPropertyCodec()
   {
     LogicalPlan dag = new LogicalPlan();
-    Map<Class<?>, Class<? extends StringCodec<?>>> codecs = new HashMap<Class<?>, Class<? extends StringCodec<?>>>();
+    Map<Class<?>, Class<? extends StringCodec<?>>> codecs = new HashMap<>();
     codecs.put(GenericOperatorProperty.class, GenericOperatorProperty.GenericOperatorPropertyStringCodec.class);
     dag.setAttribute(com.datatorrent.api.Context.DAGContext.STRING_CODECS, codecs);
     dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
index ecbeeb6..f0199f9 100644
--- a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
@@ -86,7 +86,7 @@ public class PartitioningTest
     /*
      * Received tuples are stored in a map keyed with the system assigned operator id.
      */
-    public static final ConcurrentHashMap<String, List<Object>> receivedTuples = new ConcurrentHashMap<String, List<Object>>();
+    public static final ConcurrentHashMap<String, List<Object>> receivedTuples = new ConcurrentHashMap<>();
     private transient int operatorId;
     public String prefix = "";
 
@@ -107,7 +107,7 @@ public class PartitioningTest
         synchronized (receivedTuples) {
           List<Object> l = receivedTuples.get(id);
           if (l == null) {
-            l = Collections.synchronizedList(new ArrayList<Object>());
+            l = Collections.synchronizedList(new ArrayList<>());
             //LOG.debug("adding {} {}", id, l);
             receivedTuples.put(id, l);
           }
@@ -121,12 +121,12 @@ public class PartitioningTest
 
     };
     @OutputPortFieldAnnotation( optional = true)
-    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
   }
 
   public static class TestInputOperator<T> extends BaseOperator implements InputOperator
   {
-    public final transient DefaultOutputPort<T> output = new DefaultOutputPort<T>();
+    public final transient DefaultOutputPort<T> output = new DefaultOutputPort<>();
     transient boolean first;
     transient long windowId;
     boolean blockEndStream = false;
@@ -178,9 +178,9 @@ public class PartitioningTest
     CollectorOperator.receivedTuples.clear();
 
     TestInputOperator<Integer> input = dag.addOperator("input", new TestInputOperator<Integer>());
-    input.testTuples = new ArrayList<List<Integer>>();
+    input.testTuples = new ArrayList<>();
     for (Integer[] tuples: testData) {
-      input.testTuples.add(new ArrayList<Integer>(Arrays.asList(tuples)));
+      input.testTuples.add(new ArrayList<>(Arrays.asList(tuples)));
     }
     CollectorOperator collector = dag.addOperator("collector", new CollectorOperator());
     collector.prefix = "" + System.identityHashCode(collector);
@@ -234,7 +234,7 @@ public class PartitioningTest
     {
       Map<Integer, Integer> m = loadIndicators.get();
       if (m == null) {
-        loadIndicators.set(m = new ConcurrentHashMap<Integer, Integer>());
+        loadIndicators.set(m = new ConcurrentHashMap<>());
       }
       m.put(oper.getId(), load);
     }
@@ -341,7 +341,7 @@ public class PartitioningTest
     Assert.assertNotNull("" + nodeMap, inputDeployed);
 
     // add tuple that matches the partition key and check that each partition receives it
-    ArrayList<Integer> inputTuples = new ArrayList<Integer>();
+    ArrayList<Integer> inputTuples = new ArrayList<>();
     LOG.debug("Number of partitions {}", partitions.size());
     for (PTOperator p: partitions) {
       // default partitioning has one port mapping with a single partition key
@@ -391,7 +391,7 @@ public class PartitioningTest
     @Override
     public Collection<Partition<PartitionableInputOperator>> definePartitions(Collection<Partition<PartitionableInputOperator>> partitions, PartitioningContext context)
     {
-      List<Partition<PartitionableInputOperator>> newPartitions = new ArrayList<Partition<PartitionableInputOperator>>(3);
+      List<Partition<PartitionableInputOperator>> newPartitions = new ArrayList<>(3);
       Iterator<? extends Partition<PartitionableInputOperator>> iterator = partitions.iterator();
       Partition<PartitionableInputOperator> templatePartition;
       for (int i = 0; i < 3; i++) {
@@ -401,7 +401,7 @@ public class PartitioningTest
           op.partitionProperty = templatePartition.getPartitionedInstance().partitionProperty;
         }
         op.partitionProperty += "_" + i;
-        newPartitions.add(new DefaultPartition<PartitionableInputOperator>(op));
+        newPartitions.add(new DefaultPartition<>(op));
       }
       return newPartitions;
     }
@@ -431,7 +431,7 @@ public class PartitioningTest
       lc.runAsync();
 
       List<PTOperator> partitions = assertNumberPartitions(3, lc, dag.getMeta(input));
-      Set<String> partProperties = new HashSet<String>();
+      Set<String> partProperties = new HashSet<>();
       for (PTOperator p : partitions) {
         LocalStreamingContainer c = StramTestSupport.waitForActivation(lc, p);
         Map<Integer, Node<?>> nodeMap = c.getNodes();
@@ -460,7 +460,7 @@ public class PartitioningTest
       PartitionLoadWatch.remove(partitions.get(0));
 
       partitions = assertNumberPartitions(3, lc, dag.getMeta(input));
-      partProperties = new HashSet<String>();
+      partProperties = new HashSet<>();
       for (PTOperator p: partitions) {
         LocalStreamingContainer c = StramTestSupport.waitForActivation(lc, p);
         Map<Integer, Node<?>> nodeMap = c.getNodes();

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index 35bb363..cee8247 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -1007,7 +1007,7 @@ public class StreamCodecTest
       lastId = assignNewContainers(dnm, lastId);
 
       List<PTOperator> operators = plan.getOperators(n2meta);
-      List<PTOperator> upstreamOperators = new ArrayList<PTOperator>();
+      List<PTOperator> upstreamOperators = new ArrayList<>();
       for (PTOperator operator : operators) {
         upstreamOperators.addAll(operator.upstreamMerge.values());
         /*
@@ -1036,7 +1036,7 @@ public class StreamCodecTest
       lastId = assignNewContainers(dnm, lastId);
 
       List<PTOperator> operators = plan.getOperators(n3meta);
-      List<PTOperator> upstreamOperators = new ArrayList<PTOperator>();
+      List<PTOperator> upstreamOperators = new ArrayList<>();
       for (PTOperator operator : operators) {
         upstreamOperators.addAll(operator.upstreamMerge.values());
       }
@@ -1063,7 +1063,7 @@ public class StreamCodecTest
       lastId = assignNewContainers(dnm, lastId);
 
       List<PTOperator> operators = plan.getOperators(n2meta);
-      List<PTOperator> upstreamOperators = new ArrayList<PTOperator>();
+      List<PTOperator> upstreamOperators = new ArrayList<>();
       for (PTOperator operator : operators) {
         upstreamOperators.addAll(operator.upstreamMerge.values());
         /*
@@ -1144,7 +1144,7 @@ public class StreamCodecTest
 
   private Set<PTOperator> getUnifiers(PhysicalPlan plan)
   {
-    Set<PTOperator> unifiers = new HashSet<PTOperator>();
+    Set<PTOperator> unifiers = new HashSet<>();
     for (PTContainer container : plan.getContainers()) {
       for (PTOperator operator : container.getOperators()) {
         if (operator.isUnifier()) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index c606f47..cb2d760 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -124,14 +124,14 @@ public class StreamingContainerManagerTest
     input.portName = "inputPortNameOnNode";
     input.sourceNodeId = 99;
 
-    ndi.inputs = new ArrayList<OperatorDeployInfo.InputDeployInfo>();
+    ndi.inputs = new ArrayList<>();
     ndi.inputs.add(input);
 
     OperatorDeployInfo.OutputDeployInfo output = new OperatorDeployInfo.OutputDeployInfo();
     output.declaredStreamId = "streamFromNode";
     output.portName = "outputPortNameOnNode";
 
-    ndi.outputs = new ArrayList<OperatorDeployInfo.OutputDeployInfo>();
+    ndi.outputs = new ArrayList<>();
     ndi.outputs.add(output);
 
     ContainerHeartbeatResponse scc = new ContainerHeartbeatResponse();

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java
index f6b7277..59f9dcc 100644
--- a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java
@@ -34,7 +34,7 @@ public class ApexCliMiscTest
 {
   ApexCli cli;
 
-  static Map<String, String> env = new HashMap<String, String>();
+  static Map<String, String> env = new HashMap<>();
   static String userHome;
 
   @Before

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
index 2ac1c50..f1356df 100644
--- a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
@@ -59,7 +59,7 @@ public class ApexCliTest
   static TemporaryFolder testFolder = new TemporaryFolder();
   ApexCli cli;
 
-  static Map<String, String> env = new HashMap<String, String>();
+  static Map<String, String> env = new HashMap<>();
   static String userHome;
 
   @BeforeClass

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
index d1a18ae..26aced8 100644
--- a/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
@@ -112,8 +112,8 @@ public class DefaultStatefulStreamCodecTest
   @Test
   public void testString()
   {
-    StatefulStreamCodec<Object> coder = new DefaultStatefulStreamCodec<Object>();
-    StatefulStreamCodec<Object> decoder = new DefaultStatefulStreamCodec<Object>();
+    StatefulStreamCodec<Object> coder = new DefaultStatefulStreamCodec<>();
+    StatefulStreamCodec<Object> decoder = new DefaultStatefulStreamCodec<>();
 
     String hello = "hello";
 
@@ -182,7 +182,7 @@ public class DefaultStatefulStreamCodecTest
   public void testFinalFieldSerialization() throws Exception
   {
     TestTuple t1 = new TestTuple(5);
-    DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<Object>();
+    DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<>();
     DataStatePair dsp = c.toDataStatePair(t1);
     TestTuple t2 = (TestTuple)c.fromDataStatePair(dsp);
     Assert.assertEquals("", t1.finalField, t2.finalField);
@@ -208,7 +208,7 @@ public class DefaultStatefulStreamCodecTest
     Object inner = outer.new InnerClass();
 
     for (Object o: new Object[] {outer, inner}) {
-      DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<Object>();
+      DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<>();
       DataStatePair dsp = c.toDataStatePair(o);
       c.fromDataStatePair(dsp);
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
index cc777f7..64298de 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
@@ -77,7 +77,7 @@ public class AtMostOnceTest extends ProcessingModeTests
   @Override
   public void testNonLinearOperatorRecovery() throws InterruptedException
   {
-    final HashSet<Object> collection = new HashSet<Object>();
+    final HashSet<Object> collection = new HashSet<>();
 
     com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap map = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
     map.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 0);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index da5c7b7..66f1b84 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -197,7 +197,7 @@ public class GenericNodeTest
 
     };
     @OutputPortFieldAnnotation( optional = true)
-    DefaultOutputPort<Object> op = new DefaultOutputPort<Object>();
+    DefaultOutputPort<Object> op = new DefaultOutputPort<>();
 
     @Override
     public void beginWindow(long windowId)
@@ -226,7 +226,7 @@ public class GenericNodeTest
 
   public static class CheckpointDistanceOperator extends GenericOperator
   {
-    List<Integer> distances = new ArrayList<Integer>();
+    List<Integer> distances = new ArrayList<>();
     int numWindows = 0;
     int maxWindows = 0;
 
@@ -245,7 +245,7 @@ public class GenericNodeTest
   public void testSynchingLogic() throws InterruptedException
   {
     long sleeptime = 25L;
-    final ArrayList<Object> list = new ArrayList<Object>();
+    final ArrayList<Object> list = new ArrayList<>();
     GenericOperator go = new GenericOperator();
     final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
         new DefaultAttributeMap(), null));
@@ -376,8 +376,8 @@ public class GenericNodeTest
     final Server bufferServer = new Server(eventloop, 0); // find random port
     final int bufferServerPort = bufferServer.run().getPort();
 
-    final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>();
-    final BlockingQueue<Object> tuples = new ArrayBlockingQueue<Object>(10);
+    final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<>();
+    final BlockingQueue<Object> tuples = new ArrayBlockingQueue<>(10);
 
     GenericTestOperator go = new GenericTestOperator();
     final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
@@ -905,7 +905,7 @@ public class GenericNodeTest
     CheckpointDistanceOperator go = new CheckpointDistanceOperator();
     go.maxWindows = maxWindows;
 
-    List<Integer> checkpoints = new ArrayList<Integer>();
+    List<Integer> checkpoints = new ArrayList<>();
 
     int window = 0;
     while (window < maxWindows) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
index 84217eb..bb2e72f 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
@@ -51,10 +51,10 @@ public class InputOperatorTest
 
   public static class EvenOddIntegerGeneratorInputOperator implements InputOperator, com.datatorrent.api.Operator.ActivationListener<OperatorContext>
   {
-    public final transient DefaultOutputPort<Integer> even = new DefaultOutputPort<Integer>();
-    public final transient DefaultOutputPort<Integer> odd = new DefaultOutputPort<Integer>();
-    private final transient CircularBuffer<Integer> evenBuffer = new CircularBuffer<Integer>(1024);
-    private final transient CircularBuffer<Integer> oddBuffer = new CircularBuffer<Integer>(1024);
+    public final transient DefaultOutputPort<Integer> even = new DefaultOutputPort<>();
+    public final transient DefaultOutputPort<Integer> odd = new DefaultOutputPort<>();
+    private final transient CircularBuffer<Integer> evenBuffer = new CircularBuffer<>(1024);
+    private final transient CircularBuffer<Integer> oddBuffer = new CircularBuffer<>(1024);
     private volatile Thread dataGeneratorThread;
 
     @Override
@@ -179,7 +179,7 @@ public class InputOperatorTest
     public void setConnected(boolean flag)
     {
       if (flag) {
-        collections.put(id, list = new ArrayList<T>());
+        collections.put(id, list = new ArrayList<>());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
index 55b5eab..f669832 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
@@ -171,7 +171,7 @@ public class NodeTest
 
     }
 
-    static final ArrayList<Call> calls = new ArrayList<Call>();
+    static final ArrayList<Call> calls = new ArrayList<>();
 
     @Override
     public void save(Object object, int operatorId, long windowId) throws IOException