You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/04/09 20:39:38 UTC
[2/2] apex-core git commit: APEXCORE-691 Use type inference for
generic instance creation closes #505
APEXCORE-691 Use type inference for generic instance creation
closes #505
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/aa81bea3
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/aa81bea3
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/aa81bea3
Branch: refs/heads/master
Commit: aa81bea306aed51fd881d97ce62a01537eeb2003
Parents: 9383613
Author: Apex Dev <de...@apex.apache.org>
Authored: Fri Apr 7 10:08:45 2017 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Sun Apr 9 13:38:47 2017 -0700
----------------------------------------------------------------------
.../java/com/datatorrent/api/AffinityRule.java | 2 +-
.../java/com/datatorrent/api/Attribute.java | 6 +-
.../main/java/com/datatorrent/api/Context.java | 76 ++++++++++----------
.../java/com/datatorrent/api/StringCodec.java | 8 +--
.../org/apache/apex/api/YarnAppLauncher.java | 6 +-
.../com/datatorrent/api/AttributeMapTest.java | 2 +-
.../bufferserver/internal/DataList.java | 10 +--
.../bufferserver/internal/LogicalNode.java | 4 +-
.../datatorrent/bufferserver/server/Server.java | 4 +-
.../datatorrent/bufferserver/util/System.java | 2 +-
.../packet/SubscribeRequestTupleTest.java | 2 +-
.../bufferserver/support/Subscriber.java | 2 +-
.../partitioner/StatelessPartitioner.java | 18 ++---
.../common/security/SecurityContext.java | 6 +-
.../common/util/DefaultDelayOperator.java | 2 +-
.../util/JacksonObjectMapperProvider.java | 6 +-
.../common/util/PubSubMessageCodec.java | 14 ++--
.../partitioner/StatelessPartitionerTest.java | 18 ++---
.../common/util/SerializableObjectTest.java | 4 +-
.../BlacklistBasedResourceRequestHandler.java | 4 +-
.../stram/ResourceRequestHandler.java | 4 +-
.../stram/client/PermissionsInfo.java | 8 +--
.../stram/client/StramClientUtils.java | 2 +-
.../stram/engine/OperatorContext.java | 4 +-
.../stram/plan/logical/LogicalPlan.java | 10 +--
.../stram/plan/physical/PhysicalPlan.java | 4 +-
.../stram/util/PubSubWebSocketClient.java | 4 +-
.../stram/webapp/asm/CompactUtil.java | 2 +-
.../com/datatorrent/stram/webapp/asm/Type.java | 4 +-
.../com/datatorrent/stram/CheckpointTest.java | 4 +-
.../stram/GenericOperatorPropertyCodecTest.java | 2 +-
.../com/datatorrent/stram/PartitioningTest.java | 24 +++----
.../com/datatorrent/stram/StreamCodecTest.java | 8 +--
.../stram/StreamingContainerManagerTest.java | 4 +-
.../datatorrent/stram/cli/ApexCliMiscTest.java | 2 +-
.../com/datatorrent/stram/cli/ApexCliTest.java | 2 +-
.../codec/DefaultStatefulStreamCodecTest.java | 8 +--
.../stram/engine/AtMostOnceTest.java | 2 +-
.../stram/engine/GenericNodeTest.java | 12 ++--
.../stram/engine/InputOperatorTest.java | 10 +--
.../com/datatorrent/stram/engine/NodeTest.java | 2 +-
.../stram/engine/ProcessingModeTests.java | 8 +--
.../datatorrent/stram/engine/SliderTest.java | 2 +-
.../com/datatorrent/stram/engine/StatsTest.java | 6 +-
.../stram/engine/StreamingContainerTest.java | 2 +-
.../engine/TestGeneratorInputOperator.java | 4 +-
.../stram/engine/WindowGeneratorTest.java | 2 +-
.../moduleexperiment/InjectConfigTest.java | 2 +-
.../stram/plan/StreamPersistanceTests.java | 8 +--
.../datatorrent/stram/plan/TestPlanContext.java | 2 +-
.../logical/LogicalPlanConfigurationTest.java | 6 +-
.../stram/plan/logical/LogicalPlanTest.java | 12 ++--
.../plan/logical/module/ModuleAppTest.java | 10 +--
.../logical/module/TestModuleExpansion.java | 2 +-
.../stram/plan/logical/module/TestModules.java | 4 +-
.../stram/plan/physical/PhysicalPlanTest.java | 26 +++----
.../stream/BufferServerSubscriberTest.java | 2 +-
.../stram/stream/FastPublisherTest.java | 2 +-
.../stram/stream/FastStreamTest.java | 2 +-
.../stram/stream/InlineStreamTest.java | 8 +--
.../stram/stream/OiOEndWindowTest.java | 4 +-
.../datatorrent/stram/stream/OiOStreamTest.java | 14 ++--
.../stram/stream/SocketStreamTest.java | 2 +-
.../support/ManualScheduledExecutorService.java | 2 +-
.../stram/support/StramTestSupport.java | 6 +-
.../stram/util/StablePriorityQueueTest.java | 4 +-
.../stram/webapp/OperatorDiscoveryTest.java | 26 +++----
.../stram/webapp/StramWebServicesTest.java | 6 +-
.../stram/webapp/TypeDiscoveryTest.java | 12 ++--
69 files changed, 252 insertions(+), 252 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/api/src/main/java/com/datatorrent/api/AffinityRule.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/AffinityRule.java b/api/src/main/java/com/datatorrent/api/AffinityRule.java
index 5e10ccd..304c086 100644
--- a/api/src/main/java/com/datatorrent/api/AffinityRule.java
+++ b/api/src/main/java/com/datatorrent/api/AffinityRule.java
@@ -92,7 +92,7 @@ public class AffinityRule implements Serializable
public AffinityRule(Type type, Locality locality, boolean relaxLocality, String firstOperator, String... otherOperators)
{
this(type, locality, relaxLocality);
- LinkedList<String> operators = new LinkedList<String>();
+ LinkedList<String> operators = new LinkedList<>();
if (firstOperator != null && otherOperators.length >= 1) {
operators.add(firstOperator);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/api/src/main/java/com/datatorrent/api/Attribute.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Attribute.java b/api/src/main/java/com/datatorrent/api/Attribute.java
index 2efc84f..821ecb2 100644
--- a/api/src/main/java/com/datatorrent/api/Attribute.java
+++ b/api/src/main/java/com/datatorrent/api/Attribute.java
@@ -236,11 +236,11 @@ public class Attribute<T> implements Serializable
*/
public static class AttributeInitializer
{
- static final HashMap<Class<?>, Set<Attribute<Object>>> map = new HashMap<Class<?>, Set<Attribute<Object>>>();
+ static final HashMap<Class<?>, Set<Attribute<Object>>> map = new HashMap<>();
public static Map<Attribute<Object>, Object> getAllAttributes(Context context, Class<?> clazz)
{
- Map<Attribute<Object>, Object> result = new HashMap<Attribute<Object>, Object>();
+ Map<Attribute<Object>, Object> result = new HashMap<>();
try {
for (Field f: clazz.getDeclaredFields()) {
if (Modifier.isStatic(f.getModifiers()) && Attribute.class.isAssignableFrom(f.getType())) {
@@ -273,7 +273,7 @@ public class Attribute<T> implements Serializable
if (map.containsKey(clazz)) {
return 0;
}
- Set<Attribute<Object>> set = new HashSet<Attribute<Object>>();
+ Set<Attribute<Object>> set = new HashSet<>();
try {
for (Field f: clazz.getDeclaredFields()) {
if (Modifier.isStatic(f.getModifiers()) && Attribute.class.isAssignableFrom(f.getType())) {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 3d3cffe..94022ff 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -121,17 +121,17 @@ public interface Context
/**
* Number of tuples the poll buffer can cache without blocking the input stream to the port.
*/
- Attribute<Integer> QUEUE_CAPACITY = new Attribute<Integer>(1024);
+ Attribute<Integer> QUEUE_CAPACITY = new Attribute<>(1024);
/**
* The amount of buffer memory this port requires. There is a buffer server in each container. This is used to calculate total buffer server memory for container.
* Also due to the nature of the application, if buffer server needs to use more RAM, from time to time, this number may
* not be adhered to.
*/
- Attribute<Integer> BUFFER_MEMORY_MB = new Attribute<Integer>(8 * 64);
+ Attribute<Integer> BUFFER_MEMORY_MB = new Attribute<>(8 * 64);
/**
* Poll period in milliseconds when the port buffer reaches its limits.
*/
- Attribute<Integer> SPIN_MILLIS = new Attribute<Integer>(10);
+ Attribute<Integer> SPIN_MILLIS = new Attribute<>(10);
/**
* Input port attribute. Extend partitioning of an upstream operator w/o intermediate merge.
* Can be used to form parallel partitions that span a group of operators.
@@ -139,7 +139,7 @@ public interface Context
* If multiple ports of an operator have the setting, incoming streams must track back to
* a common root partition, i.e. the operator join forks of the same origin.
*/
- Attribute<Boolean> PARTITION_PARALLEL = new Attribute<Boolean>(false);
+ Attribute<Boolean> PARTITION_PARALLEL = new Attribute<>(false);
/**
* Attribute of output port to specify how many partitions should be merged by a single unifier instance. If the
* number of partitions exceeds the limit set, a cascading unifier plan will be created. For example, 4 partitions
@@ -147,7 +147,7 @@ public interface Context
* network I/O or other resource requirement for each unifier container (depends on the specific functionality of
* the unifier), enabling horizontal scale by overcoming the single unifier bottleneck.
*/
- Attribute<Integer> UNIFIER_LIMIT = new Attribute<Integer>(Integer.MAX_VALUE);
+ Attribute<Integer> UNIFIER_LIMIT = new Attribute<>(Integer.MAX_VALUE);
/**
* Attribute to specify that the final unifier be always a single unifier. This is useful when in MxN partitioning
@@ -158,16 +158,16 @@ public interface Context
* the inputs. In this case the default unifier behavior can be specified on the output port and individual
* exceptions can be specified on the corresponding input ports.
*/
- Attribute<Boolean> UNIFIER_SINGLE_FINAL = new Attribute<Boolean>(Boolean.FALSE);
+ Attribute<Boolean> UNIFIER_SINGLE_FINAL = new Attribute<>(Boolean.FALSE);
/**
* Whether or not to auto record the tuples
*/
- Attribute<Boolean> AUTO_RECORD = new Attribute<Boolean>(false);
+ Attribute<Boolean> AUTO_RECORD = new Attribute<>(false);
/**
* Whether the output is unified.
* This is a read-only attribute to query that whether the output of the operator from multiple instances is being unified.
*/
- Attribute<Boolean> IS_OUTPUT_UNIFIED = new Attribute<Boolean>(false);
+ Attribute<Boolean> IS_OUTPUT_UNIFIED = new Attribute<>(false);
/**
* Provide the codec which can be used to serialize or deserialize the data
* that can be received on the port. If it is unspecified the engine may use
@@ -193,13 +193,13 @@ public interface Context
* of the operator. On subsequent run, it's the windowId of the checkpoint from which the operator state
* is recovered.
*/
- Attribute<Long> ACTIVATION_WINDOW_ID = new Attribute<Long>(Stateless.WINDOW_ID);
+ Attribute<Long> ACTIVATION_WINDOW_ID = new Attribute<>(Stateless.WINDOW_ID);
/**
* It is a maximum poll period in milliseconds when there are no tuples available on any of the input ports of the
* operator. Platform uses the heuristic to change poll period from 0 to SPIN_MILLIS seconds.
* Default value is 10 milliseconds.
*/
- Attribute<Integer> SPIN_MILLIS = new Attribute<Integer>(10);
+ Attribute<Integer> SPIN_MILLIS = new Attribute<>(10);
/**
* The maximum number of attempts to restart a failing operator before shutting down the application.
* Until this number is reached, when an operator fails to start it is re-spawned in a new container. Once all the
@@ -218,15 +218,15 @@ public interface Context
* by the engine. The attribute is ignored when the operator was already declared stateless through the
* {@link Stateless} annotation.
*/
- Attribute<Boolean> STATELESS = new Attribute<Boolean>(false);
+ Attribute<Boolean> STATELESS = new Attribute<>(false);
/**
* Memory resource that the operator requires for optimal functioning. Used to calculate total memory requirement for containers.
*/
- Attribute<Integer> MEMORY_MB = new Attribute<Integer>(1024);
+ Attribute<Integer> MEMORY_MB = new Attribute<>(1024);
/**
* CPU Cores that the operator requires for optimal functioning. Used to calculate total CPU Cores requirement for containers.
*/
- Attribute<Integer> VCORES = new Attribute<Integer>(0);
+ Attribute<Integer> VCORES = new Attribute<>(0);
/**
* The options to be pass to JVM when launching the operator. Options such as java maximum heap size can be specified here.
@@ -235,7 +235,7 @@ public interface Context
/**
* Attribute of the operator that tells the platform how many streaming windows make 1 application window.
*/
- Attribute<Integer> APPLICATION_WINDOW_COUNT = new Attribute<Integer>(1);
+ Attribute<Integer> APPLICATION_WINDOW_COUNT = new Attribute<>(1);
/**
* When set it changes the computation to sliding window computation where duration is determined using {@link #APPLICATION_WINDOW_COUNT} that is
* slided by duration determined using value of this attribute. Default value is null which is equivalent to that of {@link #APPLICATION_WINDOW_COUNT}.
@@ -251,7 +251,7 @@ public interface Context
* value. Typically user would define this value to be the same as that of APPLICATION_WINDOW_COUNT so checkpointing
* will be done at application window boundary.
*/
- Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<Integer>(1);
+ Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<>(1);
/**
* Name of host to directly control locality of an operator. Complementary to stream locality (NODE_LOCAL affinity).
* For example, the user may wish to specify a locality constraint for an input operator relative to its data source.
@@ -274,18 +274,18 @@ public interface Context
* If the processing mode for an operator is specified as EXACTLY_ONCE then the processing mode for all downstream operators
* should be specified as AT_MOST_ONCE otherwise it will result in an error.
*/
- Attribute<Operator.ProcessingMode> PROCESSING_MODE = new Attribute<Operator.ProcessingMode>(ProcessingMode.AT_LEAST_ONCE);
+ Attribute<Operator.ProcessingMode> PROCESSING_MODE = new Attribute<>(ProcessingMode.AT_LEAST_ONCE);
/**
* Timeout to identify stalled processing, specified as count of streaming windows. If the last processed
* window does not advance within the specified timeout count, the operator will be considered stuck and the
* container restart. There are multiple reasons this could happen: clock drift, hardware issue, networking issue,
* blocking operator logic, etc.
*/
- Attribute<Integer> TIMEOUT_WINDOW_COUNT = new Attribute<Integer>(120);
+ Attribute<Integer> TIMEOUT_WINDOW_COUNT = new Attribute<>(120);
/**
* Whether or not to auto record the tuples
*/
- Attribute<Boolean> AUTO_RECORD = new Attribute<Boolean>(false);
+ Attribute<Boolean> AUTO_RECORD = new Attribute<>(false);
/**
* How the operator distributes its state and share the input can be influenced by setting the Partitioner attribute.
* If this attribute is set to non null value, the instance of the partitioner is used to partition and merge the
@@ -348,7 +348,7 @@ public interface Context
* Name under which the application will be shown in the resource manager.
* If not set, the default is the configuration Java class or property file name.
*/
- Attribute<String> APPLICATION_NAME = new Attribute<String>("unknown-application-name");
+ Attribute<String> APPLICATION_NAME = new Attribute<>("unknown-application-name");
/**
* URL to the application's documentation.
*/
@@ -387,7 +387,7 @@ public interface Context
/**
* Dump extra debug information in launcher, master and containers.
*/
- Attribute<Boolean> DEBUG = new Attribute<Boolean>(false);
+ Attribute<Boolean> DEBUG = new Attribute<>(false);
/**
* The options to be pass to JVM when launching the containers. Options such as java maximum heap size can be specified here.
*/
@@ -396,20 +396,20 @@ public interface Context
* The amount of memory to be requested for the application master. Not used in local mode.
* Default value is 1GB.
*/
- Attribute<Integer> MASTER_MEMORY_MB = new Attribute<Integer>(1024);
+ Attribute<Integer> MASTER_MEMORY_MB = new Attribute<>(1024);
/**
* Where to spool the data once the buffer server capacity is reached.
*/
- Attribute<Boolean> BUFFER_SPOOLING = new Attribute<Boolean>(true);
+ Attribute<Boolean> BUFFER_SPOOLING = new Attribute<>(true);
/**
* The streaming window size to use for the application. It is specified in milliseconds. Default value is 500ms.
*/
- Attribute<Integer> STREAMING_WINDOW_SIZE_MILLIS = new Attribute<Integer>(500);
+ Attribute<Integer> STREAMING_WINDOW_SIZE_MILLIS = new Attribute<>(500);
/**
* The time interval for saving the operator state. It is specified as a multiple of streaming windows. The operator
* state is saved periodically with interval equal to the checkpoint interval. Default value is 60 streaming windows.
*/
- Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<Integer>(60);
+ Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<>(60);
/**
* The path to store application dependencies, recording and other generated files for application master and containers.
*/
@@ -418,13 +418,13 @@ public interface Context
* The size limit for a file where tuple recordings are stored. When tuples are being recorded they are stored
* in files. When a file size reaches this limit a new file is created and tuples start getting stored in the new file. Default value is 128k.
*/
- Attribute<Integer> TUPLE_RECORDING_PART_FILE_SIZE = new Attribute<Integer>(128 * 1024);
+ Attribute<Integer> TUPLE_RECORDING_PART_FILE_SIZE = new Attribute<>(128 * 1024);
/**
* The time limit for a file where tuple recordings are stored. When tuples are being recorded they are stored
* in files. When a tuple recording file creation time falls beyond the time limit window from the current time a new file
* is created and the tuples start getting stored in the new file. Default value is 30hrs.
*/
- Attribute<Integer> TUPLE_RECORDING_PART_FILE_TIME_MILLIS = new Attribute<Integer>(30 * 60 * 60 * 1000);
+ Attribute<Integer> TUPLE_RECORDING_PART_FILE_TIME_MILLIS = new Attribute<>(30 * 60 * 60 * 1000);
/**
* Address to which the application side connects to DT Gateway, in the form of host:port. This will override "dt.gateway.listenAddress" in the configuration.
*/
@@ -432,7 +432,7 @@ public interface Context
/**
* Whether or not gateway is expecting SSL connection.
*/
- Attribute<Boolean> GATEWAY_USE_SSL = new Attribute<Boolean>(false);
+ Attribute<Boolean> GATEWAY_USE_SSL = new Attribute<>(false);
/**
* The username for logging in to the gateway, if authentication is enabled.
*/
@@ -448,48 +448,48 @@ public interface Context
/**
* Maximum number of simultaneous heartbeat connections to process. Default value is 30.
*/
- Attribute<Integer> HEARTBEAT_LISTENER_THREAD_COUNT = new Attribute<Integer>(30);
+ Attribute<Integer> HEARTBEAT_LISTENER_THREAD_COUNT = new Attribute<>(30);
/**
* How frequently should operators heartbeat to stram. Recommended setting is
* 1000ms. Value 0 will disable heartbeat (for unit testing). Default value is 1000ms.
*/
- Attribute<Integer> HEARTBEAT_INTERVAL_MILLIS = new Attribute<Integer>(1000);
+ Attribute<Integer> HEARTBEAT_INTERVAL_MILLIS = new Attribute<>(1000);
/**
* Timeout for master to identify a hung container (full GC etc.). Timeout will result in container restart.
* Default value is 30s.
*/
- Attribute<Integer> HEARTBEAT_TIMEOUT_MILLIS = new Attribute<Integer>(30 * 1000);
+ Attribute<Integer> HEARTBEAT_TIMEOUT_MILLIS = new Attribute<>(30 * 1000);
/**
* Timeout for allocating container resources. Default value is 60s.
*/
- Attribute<Integer> RESOURCE_ALLOCATION_TIMEOUT_MILLIS = new Attribute<Integer>(Integer.MAX_VALUE);
+ Attribute<Integer> RESOURCE_ALLOCATION_TIMEOUT_MILLIS = new Attribute<>(Integer.MAX_VALUE);
/**
* Maximum number of windows that can be pending for statistics calculation. Statistics are computed when
* the metrics are available from all operators for a window. If the information is not available from all operators then
* the window is pending. When the number of pending windows reaches this limit the information for the oldest window
* is purged. Default value is 1000 windows.
*/
- Attribute<Integer> STATS_MAX_ALLOWABLE_WINDOWS_LAG = new Attribute<Integer>(1000);
+ Attribute<Integer> STATS_MAX_ALLOWABLE_WINDOWS_LAG = new Attribute<>(1000);
/**
* Whether or not we record statistics. The statistics are recorded for each heartbeat if enabled. The default value is false.
*/
- Attribute<Boolean> ENABLE_STATS_RECORDING = new Attribute<Boolean>(false);
+ Attribute<Boolean> ENABLE_STATS_RECORDING = new Attribute<>(false);
/**
* The time interval for throughput calculation. The throughput is periodically calculated with interval greater than or
* equal to the throughput calculation interval. The default value is 10s.
*/
- Attribute<Integer> THROUGHPUT_CALCULATION_INTERVAL = new Attribute<Integer>(10000);
+ Attribute<Integer> THROUGHPUT_CALCULATION_INTERVAL = new Attribute<>(10000);
/**
* The maximum number of samples to use when calculating throughput. In practice fewer samples may be used
* if the THROUGHPUT_CALCULATION_INTERVAL is exceeded. Default value is 1000 samples.
*/
- Attribute<Integer> THROUGHPUT_CALCULATION_MAX_SAMPLES = new Attribute<Integer>(1000);
+ Attribute<Integer> THROUGHPUT_CALCULATION_MAX_SAMPLES = new Attribute<>(1000);
/**
* The number of samples to use when using RPC latency to compensate for clock skews and network latency when
* calculating stats. Specify 0 if RPC latency should not be used at all to calculate stats. Default value is 100
* samples.
*/
- Attribute<Integer> RPC_LATENCY_COMPENSATION_SAMPLES = new Attribute<Integer>(100);
+ Attribute<Integer> RPC_LATENCY_COMPENSATION_SAMPLES = new Attribute<>(100);
/**
* The agent which can be used to find the jvm options for the container.
*/
@@ -511,12 +511,12 @@ public interface Context
* blacklisting of nodes by application master
* Blacklisting for nodes is disabled for the default value
*/
- Attribute<Integer> MAX_CONSECUTIVE_CONTAINER_FAILURES_FOR_BLACKLIST = new Attribute<Integer>(Integer.MAX_VALUE);
+ Attribute<Integer> MAX_CONSECUTIVE_CONTAINER_FAILURES_FOR_BLACKLIST = new Attribute<>(Integer.MAX_VALUE);
/**
* The amount of time to wait before removing failed nodes from blacklist
*/
- Attribute<Long> BLACKLISTED_NODE_REMOVAL_TIME_MILLIS = new Attribute<Long>(new Long(60 * 60 * 1000));
+ Attribute<Long> BLACKLISTED_NODE_REMOVAL_TIME_MILLIS = new Attribute<>(new Long(60 * 60 * 1000));
/**
* Affinity rules for specifying affinity and anti-affinity between logical operators
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/api/src/main/java/com/datatorrent/api/StringCodec.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/StringCodec.java b/api/src/main/java/com/datatorrent/api/StringCodec.java
index d4a0a41..fa8ab23 100644
--- a/api/src/main/java/com/datatorrent/api/StringCodec.java
+++ b/api/src/main/java/com/datatorrent/api/StringCodec.java
@@ -302,7 +302,7 @@ public interface StringCodec<T>
return clazz.getConstructor(String.class).newInstance(parts[1]);
} else {
T object = clazz.getConstructor(String.class).newInstance(parts[1]);
- HashMap<String, String> hashMap = new HashMap<String, String>();
+ HashMap<String, String> hashMap = new HashMap<>();
for (int i = 2; i < parts.length; i++) {
String[] keyValPair = parts[i].split(propertySeparator, 2);
hashMap.put(keyValPair[0], keyValPair[1]);
@@ -365,11 +365,11 @@ public interface StringCodec<T>
}
if (string.isEmpty()) {
- return new HashMap<K, V>();
+ return new HashMap<>();
}
String[] parts = string.split(separator);
- HashMap<K, V> map = new HashMap<K, V>();
+ HashMap<K, V> map = new HashMap<>();
for (String part : parts) {
String[] kvpair = part.split(equal, 2);
map.put(keyCodec.fromString(kvpair[0]), valueCodec.fromString(kvpair[1]));
@@ -433,7 +433,7 @@ public interface StringCodec<T>
}
String[] parts = string.split(separator);
- ArrayList<T> arrayList = new ArrayList<T>(parts.length);
+ ArrayList<T> arrayList = new ArrayList<>(parts.length);
for (String part : parts) {
arrayList.add(codec.fromString(part));
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java b/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
index 82cf50e..8ff0205 100644
--- a/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
+++ b/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
@@ -36,17 +36,17 @@ public abstract class YarnAppLauncher<H extends YarnAppLauncher.YarnAppHandle> e
/**
* Parameter to specify extra jars for launch.
*/
- public static final Attribute<String> LIB_JARS = new Attribute<String>(new StringCodec.String2String());
+ public static final Attribute<String> LIB_JARS = new Attribute<>(new StringCodec.String2String());
/**
* Parameter to specify the previous application id to use to resume launch from.
*/
- public static final Attribute<String> ORIGINAL_APP_ID = new Attribute<String>(new StringCodec.String2String());
+ public static final Attribute<String> ORIGINAL_APP_ID = new Attribute<>(new StringCodec.String2String());
/**
* Parameter to specify the queue name to use for launch.
*/
- public static final Attribute<String> QUEUE_NAME = new Attribute<String>(new StringCodec.String2String());
+ public static final Attribute<String> QUEUE_NAME = new Attribute<>(new StringCodec.String2String());
static {
Attribute.AttributeMap.AttributeInitializer.initialize(YarnAppLauncher.class);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/api/src/test/java/com/datatorrent/api/AttributeMapTest.java
----------------------------------------------------------------------
diff --git a/api/src/test/java/com/datatorrent/api/AttributeMapTest.java b/api/src/test/java/com/datatorrent/api/AttributeMapTest.java
index fcb1809..b463619 100644
--- a/api/src/test/java/com/datatorrent/api/AttributeMapTest.java
+++ b/api/src/test/java/com/datatorrent/api/AttributeMapTest.java
@@ -51,7 +51,7 @@ public class AttributeMapTest
interface iface
{
- Attribute<Greeting> greeting = new Attribute<Greeting>(Greeting.hello);
+ Attribute<Greeting> greeting = new Attribute<>(Greeting.hello);
}
@Test
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index 84999fa..d08b9fc 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -336,14 +336,14 @@ public class DataList
{
all_listeners.add(dl);
//logger.debug("total {} listeners {} -> {}", all_listeners.size(), dl, this);
- ArrayList<BitVector> partitions = new ArrayList<BitVector>();
+ ArrayList<BitVector> partitions = new ArrayList<>();
if (dl.getPartitions(partitions) > 0) {
for (BitVector partition : partitions) {
HashSet<DataListener> set;
if (listeners.containsKey(partition)) {
set = listeners.get(partition);
} else {
- set = new HashSet<DataListener>();
+ set = new HashSet<>();
listeners.put(partition, set);
}
set.add(dl);
@@ -353,7 +353,7 @@ public class DataList
if (listeners.containsKey(DataListener.NULL_PARTITION)) {
set = listeners.get(DataListener.NULL_PARTITION);
} else {
- set = new HashSet<DataListener>();
+ set = new HashSet<>();
listeners.put(DataListener.NULL_PARTITION, set);
}
@@ -363,7 +363,7 @@ public class DataList
public void removeDataListener(DataListener dl)
{
- ArrayList<BitVector> partitions = new ArrayList<BitVector>();
+ ArrayList<BitVector> partitions = new ArrayList<>();
if (dl.getPartitions(partitions) > 0) {
for (BitVector partition : partitions) {
if (listeners.containsKey(partition)) {
@@ -459,7 +459,7 @@ public class DataList
// When the number of subscribers becomes high or the number of blocks becomes high, consider optimize it.
Block b = first;
- Map<Block, Integer> indices = new HashMap<Block, Integer>();
+ Map<Block, Integer> indices = new HashMap<>();
int i = 0;
while (b != null) {
indices.put(b, i++);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index 08a483a..b06e60a 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -71,8 +71,8 @@ public class LogicalNode implements DataListener
this.identifier = identifier;
this.upstream = upstream;
this.group = group;
- this.physicalNodes = new HashSet<PhysicalNode>();
- this.partitions = new HashSet<BitVector>();
+ this.physicalNodes = new HashSet<>();
+ this.partitions = new HashSet<>();
this.iterator = iterator;
this.skipWindowId = skipWindowId;
this.eventloop = eventloop;
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 7ac518b..8a56b51 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -260,7 +260,7 @@ public class Server extends AbstractServer
}
private final ConcurrentHashMap<String, DataList> publisherBuffers = new ConcurrentHashMap<>(1, 0.75f, 1);
- private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<String, LogicalNode>();
+ private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels = new ConcurrentHashMap<>();
private final int blockSize;
private final int numberOfCacheBlocks;
@@ -883,7 +883,7 @@ public class Server extends AbstractServer
}
}
- ArrayList<LogicalNode> list = new ArrayList<LogicalNode>();
+ ArrayList<LogicalNode> list = new ArrayList<>();
String publisherIdentifier = datalist.getIdentifier();
Iterator<LogicalNode> iterator = subscriberGroups.values().iterator();
while (iterator.hasNext()) {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
index 124cc5f..000ce00 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
@@ -31,7 +31,7 @@ import com.datatorrent.netlet.EventLoop;
*/
public class System
{
- private static final HashMap<String, DefaultEventLoop> eventloops = new HashMap<String, DefaultEventLoop>();
+ private static final HashMap<String, DefaultEventLoop> eventloops = new HashMap<>();
public static void startup(String identifier)
{
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
index f7b8829..371f98a 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
@@ -42,7 +42,7 @@ public class SubscribeRequestTupleTest
String down_type = "SubscriberId/StreamType";
String upstream_id = "PublisherId";
int mask = 7;
- ArrayList<Integer> partitions = new ArrayList<Integer>();
+ ArrayList<Integer> partitions = new ArrayList<>();
partitions.add(5);
long startingWindowId = 0xcafebabe00000078L;
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
index 3c0cb0e..d6e9b1a 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
@@ -32,7 +32,7 @@ import com.datatorrent.bufferserver.packet.Tuple;
*/
public class Subscriber extends com.datatorrent.bufferserver.client.Subscriber
{
- public final ArrayList<Object> resetPayloads = new ArrayList<Object>();
+ public final ArrayList<Object> resetPayloads = new ArrayList<>();
public AtomicInteger tupleCount = new AtomicInteger(0);
public WindowIdHolder firstPayload;
public WindowIdHolder lastPayload;
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java b/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
index 165d8cf..d77b1ae 100644
--- a/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
+++ b/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
@@ -114,7 +114,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
newPartitions = Lists.newArrayList();
for (int partitionCounter = 0; partitionCounter < newPartitionCount; partitionCounter++) {
- newPartitions.add(new DefaultPartition<T>(partition.getPartitionedInstance()));
+ newPartitions.add(new DefaultPartition<>(partition.getPartitionedInstance()));
}
// partition the stream that was first connected in the DAG and send full data to remaining input ports
@@ -156,8 +156,8 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
*/
public static <T extends Operator> Collection<Partition<T>> repartition(Collection<Partition<T>> partitions)
{
- List<Partition<T>> newPartitions = new ArrayList<Partition<T>>();
- HashMap<Integer, Partition<T>> lowLoadPartitions = new HashMap<Integer, Partition<T>>();
+ List<Partition<T>> newPartitions = new ArrayList<>();
+ HashMap<Integer, Partition<T>> lowLoadPartitions = new HashMap<>();
for (Partition<T> p: partitions) {
int load = p.getLoad();
if (load < 0) {
@@ -201,7 +201,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
}
for (int key: newKeys) {
- Partition<T> newPartition = new DefaultPartition<T>(p.getPartitionedInstance());
+ Partition<T> newPartition = new DefaultPartition<>(p.getPartitionedInstance());
newPartition.getPartitionKeys().put(e.getKey(), new PartitionKeys(newMask, Sets.newHashSet(key)));
newPartitions.add(newPartition);
}
@@ -224,8 +224,8 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
*/
public static <T extends Operator> Collection<Partition<T>> repartitionInputOperator(Collection<Partition<T>> partitions)
{
- List<Partition<T>> newPartitions = new ArrayList<Partition<T>>();
- List<Partition<T>> lowLoadPartitions = new ArrayList<Partition<T>>();
+ List<Partition<T>> newPartitions = new ArrayList<>();
+ List<Partition<T>> lowLoadPartitions = new ArrayList<>();
for (Partition<T> p: partitions) {
int load = p.getLoad();
if (load < 0) {
@@ -235,8 +235,8 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
lowLoadPartitions.add(p);
}
} else if (load > 0) {
- newPartitions.add(new DefaultPartition<T>(p.getPartitionedInstance()));
- newPartitions.add(new DefaultPartition<T>(p.getPartitionedInstance()));
+ newPartitions.add(new DefaultPartition<>(p.getPartitionedInstance()));
+ newPartitions.add(new DefaultPartition<>(p.getPartitionedInstance()));
} else {
newPartitions.add(p);
}
@@ -274,7 +274,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
T anOperator = newPartitions.iterator().next().getPartitionedInstance();
while (morePartitionsToCreate-- > 0) {
- DefaultPartition<T> partition = new DefaultPartition<T>(anOperator);
+ DefaultPartition<T> partition = new DefaultPartition<>(anOperator);
newPartitions.add(partition);
}
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/main/java/com/datatorrent/common/security/SecurityContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/security/SecurityContext.java b/common/src/main/java/com/datatorrent/common/security/SecurityContext.java
index dccd7b7..3dc4dda 100644
--- a/common/src/main/java/com/datatorrent/common/security/SecurityContext.java
+++ b/common/src/main/java/com/datatorrent/common/security/SecurityContext.java
@@ -32,17 +32,17 @@ public interface SecurityContext extends Context
/**
* Attribute for the user name for login.
*/
- Attribute<String> USER_NAME = new Attribute<String>((String)null);
+ Attribute<String> USER_NAME = new Attribute<>((String)null);
/**
* Attribute for the password for login.
*/
- Attribute<char[]> PASSWORD = new Attribute<char[]>((char[])null);
+ Attribute<char[]> PASSWORD = new Attribute<>((char[])null);
/**
* Attribute for the realm for login.
*/
- Attribute<String> REALM = new Attribute<String>((String)null);
+ Attribute<String> REALM = new Attribute<>((String)null);
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
index ca7490d..f90a888 100644
--- a/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
+++ b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
@@ -48,7 +48,7 @@ public class DefaultDelayOperator<T> extends BaseOperator implements Operator.De
}
};
- public transient DefaultOutputPort<T> output = new DefaultOutputPort<T>();
+ public transient DefaultOutputPort<T> output = new DefaultOutputPort<>();
protected List<T> lastWindowTuples = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java b/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
index 7723fed..ef837a8 100644
--- a/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
+++ b/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
@@ -55,9 +55,9 @@ public class JacksonObjectMapperProvider implements ContextResolver<ObjectMapper
this.objectMapper = new ObjectMapper();
objectMapper.configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, true);
objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false);
- module.addSerializer(ObjectMapperString.class, new RawSerializer<Object>(Object.class));
- module.addSerializer(JSONObject.class, new RawSerializer<Object>(Object.class));
- module.addSerializer(JSONArray.class, new RawSerializer<Object>(Object.class));
+ module.addSerializer(ObjectMapperString.class, new RawSerializer<>(Object.class));
+ module.addSerializer(JSONObject.class, new RawSerializer<>(Object.class));
+ module.addSerializer(JSONArray.class, new RawSerializer<>(Object.class));
objectMapper.registerModule(module);
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java b/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
index 63d1646..af5e10e 100644
--- a/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
+++ b/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
@@ -53,7 +53,7 @@ public class PubSubMessageCodec<T>
*/
public static <T> String constructPublishMessage(String topic, T data, PubSubMessageCodec<T> codec) throws IOException
{
- PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+ PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
pubSubMessage.setType(PubSubMessageType.PUBLISH);
pubSubMessage.setTopic(topic);
pubSubMessage.setData(data);
@@ -72,7 +72,7 @@ public class PubSubMessageCodec<T>
*/
public static <T> String constructSubscribeMessage(String topic, PubSubMessageCodec<T> codec) throws IOException
{
- PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+ PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
pubSubMessage.setType(PubSubMessageType.SUBSCRIBE);
pubSubMessage.setTopic(topic);
@@ -90,7 +90,7 @@ public class PubSubMessageCodec<T>
*/
public static <T> String constructUnsubscribeMessage(String topic, PubSubMessageCodec<T> codec) throws IOException
{
- PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+ PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
pubSubMessage.setType(PubSubMessageType.UNSUBSCRIBE);
pubSubMessage.setTopic(topic);
@@ -108,7 +108,7 @@ public class PubSubMessageCodec<T>
*/
public static <T> String constructSubscribeNumSubscribersMessage(String topic, PubSubMessageCodec<T> codec) throws IOException
{
- PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+ PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
pubSubMessage.setType(PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS);
pubSubMessage.setTopic(topic);
@@ -126,7 +126,7 @@ public class PubSubMessageCodec<T>
*/
public static <T> String constructUnsubscribeNumSubscribersMessage(String topic, PubSubMessageCodec<T> codec) throws IOException
{
- PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+ PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
pubSubMessage.setType(PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS);
pubSubMessage.setTopic(topic);
@@ -135,7 +135,7 @@ public class PubSubMessageCodec<T>
public String formatMessage(PubSubMessage<T> pubSubMessage) throws IOException
{
- HashMap<String, Object> map = new HashMap<String, Object>();
+ HashMap<String, Object> map = new HashMap<>();
map.put(PubSubMessage.TYPE_KEY, pubSubMessage.getType().getIdentifier());
map.put(PubSubMessage.TOPIC_KEY, pubSubMessage.getTopic());
T data = pubSubMessage.getData();
@@ -156,7 +156,7 @@ public class PubSubMessageCodec<T>
public PubSubMessage<T> parseMessage(String message) throws IOException
{
HashMap<String, Object> map = mapper.readValue(message, HashMap.class);
- PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+ PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
pubSubMessage.setType(PubSubMessageType.getPubSubMessageType((String)map.get(PubSubMessage.TYPE_KEY)));
pubSubMessage.setTopic((String)map.get(PubSubMessage.TOPIC_KEY));
pubSubMessage.setData((T)map.get(PubSubMessage.DATA_KEY));
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
index e7c4887..2e48f54 100644
--- a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
+++ b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
@@ -41,7 +41,7 @@ public class StatelessPartitionerTest
public static class DummyOperator implements Operator
{
- public final DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+ public final DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
private Integer value;
@@ -93,10 +93,10 @@ public class StatelessPartitionerTest
public void partition1Test()
{
DummyOperator dummyOperator = new DummyOperator(5);
- StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>();
+ StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>();
Collection<Partition<DummyOperator>> partitions = Lists.newArrayList();
- DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<DummyOperator>(dummyOperator);
+ DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<>(dummyOperator);
partitions.add(defaultPartition);
Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0));
@@ -111,10 +111,10 @@ public class StatelessPartitionerTest
public void partition5Test()
{
DummyOperator dummyOperator = new DummyOperator(5);
- StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>(5);
+ StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>(5);
Collection<Partition<DummyOperator>> partitions = Lists.newArrayList();
- DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<DummyOperator>(dummyOperator);
+ DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<>(dummyOperator);
partitions.add(defaultPartition);
Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0));
@@ -137,10 +137,10 @@ public class StatelessPartitionerTest
public void testParallelPartitionScaleUP()
{
DummyOperator dummyOperator = new DummyOperator(5);
- StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>();
+ StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>();
Collection<Partition<DummyOperator>> partitions = Lists.newArrayList();
- partitions.add(new DefaultPartition<DummyOperator>(dummyOperator));
+ partitions.add(new DefaultPartition<>(dummyOperator));
Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions,
new PartitioningContextImpl(null, 5));
@@ -151,12 +151,12 @@ public class StatelessPartitionerTest
public void testParallelPartitionScaleDown()
{
DummyOperator dummyOperator = new DummyOperator(5);
- StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>();
+ StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>();
Collection<Partition<DummyOperator>> partitions = Lists.newArrayList();
for (int i = 5; i-- > 0; ) {
- partitions.add(new DefaultPartition<DummyOperator>(dummyOperator));
+ partitions.add(new DefaultPartition<>(dummyOperator));
}
Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions,
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java b/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
index 97debe3..79fdac7 100644
--- a/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
+++ b/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
@@ -50,7 +50,7 @@ public class SerializableObjectTest
}
};
- public final transient OutputPort<T> output = new DefaultOutputPort<T>();
+ public final transient OutputPort<T> output = new DefaultOutputPort<>();
private int i;
public void setI(int i)
@@ -109,7 +109,7 @@ public class SerializableObjectTest
@Test
public void testReadResolve() throws Exception
{
- SerializableOperator<Object> pre = new SerializableOperator<Object>();
+ SerializableOperator<Object> pre = new SerializableOperator<>();
pre.setI(10);
FileOutputStream fos = new FileOutputStream(filename);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
index 53d91a5..80314c7 100644
--- a/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
+++ b/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
@@ -73,7 +73,7 @@ public class BlacklistBasedResourceRequestHandler extends ResourceRequestHandler
for (ContainerRequest cr : requests) {
ContainerStartRequest csr = hostSpecificRequests.get(cr);
ContainerRequest newCr = new ContainerRequest(cr.getCapability(), null, null, cr.getPriority());
- MutablePair<Integer, ContainerRequest> pair = new MutablePair<Integer, ContainerRequest>(loopCounter, newCr);
+ MutablePair<Integer, ContainerRequest> pair = new MutablePair<>(loopCounter, newCr);
requestedResources.put(csr, pair);
containerRequests.add(newCr);
hostSpecificRequests.remove(cr);
@@ -91,7 +91,7 @@ public class BlacklistBasedResourceRequestHandler extends ResourceRequestHandler
for (Entry<ContainerRequest, ContainerStartRequest> entry : otherContainerRequests.entrySet()) {
ContainerRequest cr = entry.getKey();
ContainerStartRequest csr = entry.getValue();
- MutablePair<Integer, ContainerRequest> pair = new MutablePair<Integer, ContainerRequest>(loopCounter, cr);
+ MutablePair<Integer, ContainerRequest> pair = new MutablePair<>(loopCounter, cr);
requestedResources.put(csr, pair);
containerRequests.add(cr);
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
index e7f9672..45206bc 100644
--- a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
+++ b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
@@ -102,7 +102,7 @@ public class ResourceRequestHandler
*/
public void addContainerRequest(Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, List<ContainerRequest> containerRequests, StreamingContainerAgent.ContainerStartRequest csr, ContainerRequest cr)
{
- MutablePair<Integer, ContainerRequest> pair = new MutablePair<Integer, ContainerRequest>(loopCounter, cr);
+ MutablePair<Integer, ContainerRequest> pair = new MutablePair<>(loopCounter, cr);
requestedResources.put(csr, pair);
containerRequests.add(cr);
}
@@ -164,7 +164,7 @@ public class ResourceRequestHandler
public List<String> getNodesExceptHost(List<String> hostNames)
{
- List<String> nodesList = new ArrayList<String>();
+ List<String> nodesList = new ArrayList<>();
Set<String> hostNameSet = Sets.newHashSet();
hostNameSet.addAll(hostNames);
for (String host : nodeReportMap.keySet()) {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java b/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java
index 3a61ee6..b374447 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java
@@ -32,10 +32,10 @@ import org.codehaus.jettison.json.JSONObject;
public class PermissionsInfo
{
- private final Set<String> readOnlyRoles = new TreeSet<String>();
- private final Set<String> readOnlyUsers = new TreeSet<String>();
- private final Set<String> readWriteRoles = new TreeSet<String>();
- private final Set<String> readWriteUsers = new TreeSet<String>();
+ private final Set<String> readOnlyRoles = new TreeSet<>();
+ private final Set<String> readOnlyUsers = new TreeSet<>();
+ private final Set<String> readWriteRoles = new TreeSet<>();
+ private final Set<String> readWriteUsers = new TreeSet<>();
private boolean readOnlyEveryone = false;
private boolean readWriteEveryone = false;
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
index 050729d..8076d4a 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
@@ -271,7 +271,7 @@ public class StramClientUtils
}
Text rmTokenService = new Text(Joiner.on(',').join(services));
- return new Token<RMDelegationTokenIdentifier>(
+ return new Token<>(
rmDelegationToken.getIdentifier().array(),
rmDelegationToken.getPassword().array(),
new Text(rmDelegationToken.getKind()),
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
index 7113280..284aefb 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
@@ -50,8 +50,8 @@ public class OperatorContext extends BaseContext implements Context.OperatorCont
private final int id;
private final String name;
// the size of the circular queue should be configurable. hardcoded to 1024 for now.
- private final CircularBuffer<ContainerStats.OperatorStats> statsBuffer = new CircularBuffer<ContainerStats.OperatorStats>(1024);
- private final CircularBuffer<OperatorRequest> requests = new CircularBuffer<OperatorRequest>(1024);
+ private final CircularBuffer<ContainerStats.OperatorStats> statsBuffer = new CircularBuffer<>(1024);
+ private final CircularBuffer<OperatorRequest> requests = new CircularBuffer<>(1024);
public final boolean stateless;
private int windowsFromCheckpoint;
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 401eea9..62c4fd8 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -760,7 +760,7 @@ public class LogicalPlan implements Serializable, DAG
codecs.put(sinkToPersistPortMeta, inputStreamCodec);
InputPortMeta persistOperatorPortMeta = assertGetPortMeta(port);
StreamCodec<Object> specifiedCodecForPersistOperator = (StreamCodec<Object>)persistOperatorPortMeta.getStreamCodec();
- StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<Object>(codecs, specifiedCodecForPersistOperator);
+ StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<>(codecs, specifiedCodecForPersistOperator);
persistOperatorPortMeta.setStreamCodec(codec);
}
}
@@ -1907,14 +1907,14 @@ public class LogicalPlan implements Serializable, DAG
HashMap<OperatorPair, AffinityRule> antiAffinities = new HashMap<>();
HashMap<OperatorPair, AffinityRule> threadLocalAffinities = new HashMap<>();
- List<String> operatorNames = new ArrayList<String>();
+ List<String> operatorNames = new ArrayList<>();
for (OperatorMeta operator : getAllOperators()) {
operatorNames.add(operator.getName());
- Set<String> containerSet = new HashSet<String>();
+ Set<String> containerSet = new HashSet<>();
containerSet.add(operator.getName());
containerAffinities.put(operator.getName(), containerSet);
- Set<String> nodeSet = new HashSet<String>();
+ Set<String> nodeSet = new HashSet<>();
nodeSet.add(operator.getName());
nodeAffinities.put(operator.getName(), nodeSet);
@@ -2073,7 +2073,7 @@ public class LogicalPlan implements Serializable, DAG
*/
public void convertRegexToList(List<String> operatorNames, AffinityRule rule)
{
- List<String> operators = new LinkedList<String>();
+ List<String> operators = new LinkedList<>();
Pattern p = Pattern.compile(rule.getOperatorRegex());
for (String name : operatorNames) {
if (p.matcher(name).matches()) {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index ce22bfd..a1da94a 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -477,13 +477,13 @@ public class PhysicalPlan implements Serializable
// Log container anti-affinity
if (LOG.isDebugEnabled()) {
for (PTContainer container : containers) {
- List<String> antiOperators = new ArrayList<String>();
+ List<String> antiOperators = new ArrayList<>();
for (PTContainer c : container.getStrictAntiPrefs()) {
for (PTOperator operator : c.getOperators()) {
antiOperators.add(operator.getName());
}
}
- List<String> containerOperators = new ArrayList<String>();
+ List<String> containerOperators = new ArrayList<>();
for (PTOperator operator : container.getOperators()) {
containerOperators.add(operator.getName());
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
index 38c0f41..47986cb 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
@@ -116,10 +116,10 @@ public abstract class PubSubWebSocketClient implements Component<Context>
*/
public PubSubWebSocketClient()
{
- throwable = new AtomicReference<Throwable>();
+ throwable = new AtomicReference<>();
ioThreadMultiplier = 1;
mapper = (new JacksonObjectMapperProvider()).getContext(null);
- codec = new PubSubMessageCodec<Object>(mapper);
+ codec = new PubSubMessageCodec<>(mapper);
AsyncHttpClientConfigBean config = new AsyncHttpClientConfigBean();
config.setIoThreadMultiplier(ioThreadMultiplier);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java b/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java
index dd75857..9fbb54d 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java
@@ -148,7 +148,7 @@ public class CompactUtil
List<CompactAnnotationNode> annotations = new LinkedList<>();
for (Object visibleAnnotation : fn.visibleAnnotations) {
CompactAnnotationNode node = new CompactAnnotationNode();
- Map<String, Object> annotationMap = new HashMap<String, Object>();
+ Map<String, Object> annotationMap = new HashMap<>();
if (visibleAnnotation instanceof AnnotationNode) {
AnnotationNode annotation = (AnnotationNode)visibleAnnotation;
if (annotation.desc.contains("InputPortFieldAnnotation")
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java b/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java
index 1e87b31..91a3cf3 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java
@@ -87,7 +87,7 @@ public interface Type
char boundChar;
- ArrayList<Type> bounds = new ArrayList<Type>();
+ ArrayList<Type> bounds = new ArrayList<>();
public Type[] getUpperBounds()
{
@@ -154,7 +154,7 @@ public interface Type
class ParameterizedTypeNode extends TypeNode
{
- ArrayList<Type> actualTypeArguments = new ArrayList<Type>();
+ ArrayList<Type> actualTypeArguments = new ArrayList<>();
public Type[] getActualTypeArguments()
{
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index d7f96d4..0c997ec 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -78,7 +78,7 @@ public class CheckpointTest
private static class MockInputOperator extends BaseOperator implements InputOperator, Operator.CheckpointNotificationListener
{
@OutputPortFieldAnnotation( optional = true)
- public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<Object>();
+ public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<>();
private transient int windowCount;
private int checkpointState;
@@ -326,7 +326,7 @@ public class CheckpointTest
public List<Checkpoint> getCheckpoints(Long... windowIds)
{
- List<Checkpoint> list = new ArrayList<Checkpoint>(windowIds.length);
+ List<Checkpoint> list = new ArrayList<>(windowIds.length);
for (Long windowId : windowIds) {
list.add(new Checkpoint(windowId, 0, 0));
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java b/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java
index b1509e0..b1f3363 100644
--- a/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java
@@ -41,7 +41,7 @@ public class GenericOperatorPropertyCodecTest
public void testGenericOperatorPropertyCodec()
{
LogicalPlan dag = new LogicalPlan();
- Map<Class<?>, Class<? extends StringCodec<?>>> codecs = new HashMap<Class<?>, Class<? extends StringCodec<?>>>();
+ Map<Class<?>, Class<? extends StringCodec<?>>> codecs = new HashMap<>();
codecs.put(GenericOperatorProperty.class, GenericOperatorProperty.GenericOperatorPropertyStringCodec.class);
dag.setAttribute(com.datatorrent.api.Context.DAGContext.STRING_CODECS, codecs);
dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
index ecbeeb6..f0199f9 100644
--- a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
@@ -86,7 +86,7 @@ public class PartitioningTest
/*
* Received tuples are stored in a map keyed with the system assigned operator id.
*/
- public static final ConcurrentHashMap<String, List<Object>> receivedTuples = new ConcurrentHashMap<String, List<Object>>();
+ public static final ConcurrentHashMap<String, List<Object>> receivedTuples = new ConcurrentHashMap<>();
private transient int operatorId;
public String prefix = "";
@@ -107,7 +107,7 @@ public class PartitioningTest
synchronized (receivedTuples) {
List<Object> l = receivedTuples.get(id);
if (l == null) {
- l = Collections.synchronizedList(new ArrayList<Object>());
+ l = Collections.synchronizedList(new ArrayList<>());
//LOG.debug("adding {} {}", id, l);
receivedTuples.put(id, l);
}
@@ -121,12 +121,12 @@ public class PartitioningTest
};
@OutputPortFieldAnnotation( optional = true)
- public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();
+ public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
}
public static class TestInputOperator<T> extends BaseOperator implements InputOperator
{
- public final transient DefaultOutputPort<T> output = new DefaultOutputPort<T>();
+ public final transient DefaultOutputPort<T> output = new DefaultOutputPort<>();
transient boolean first;
transient long windowId;
boolean blockEndStream = false;
@@ -178,9 +178,9 @@ public class PartitioningTest
CollectorOperator.receivedTuples.clear();
TestInputOperator<Integer> input = dag.addOperator("input", new TestInputOperator<Integer>());
- input.testTuples = new ArrayList<List<Integer>>();
+ input.testTuples = new ArrayList<>();
for (Integer[] tuples: testData) {
- input.testTuples.add(new ArrayList<Integer>(Arrays.asList(tuples)));
+ input.testTuples.add(new ArrayList<>(Arrays.asList(tuples)));
}
CollectorOperator collector = dag.addOperator("collector", new CollectorOperator());
collector.prefix = "" + System.identityHashCode(collector);
@@ -234,7 +234,7 @@ public class PartitioningTest
{
Map<Integer, Integer> m = loadIndicators.get();
if (m == null) {
- loadIndicators.set(m = new ConcurrentHashMap<Integer, Integer>());
+ loadIndicators.set(m = new ConcurrentHashMap<>());
}
m.put(oper.getId(), load);
}
@@ -341,7 +341,7 @@ public class PartitioningTest
Assert.assertNotNull("" + nodeMap, inputDeployed);
// add tuple that matches the partition key and check that each partition receives it
- ArrayList<Integer> inputTuples = new ArrayList<Integer>();
+ ArrayList<Integer> inputTuples = new ArrayList<>();
LOG.debug("Number of partitions {}", partitions.size());
for (PTOperator p: partitions) {
// default partitioning has one port mapping with a single partition key
@@ -391,7 +391,7 @@ public class PartitioningTest
@Override
public Collection<Partition<PartitionableInputOperator>> definePartitions(Collection<Partition<PartitionableInputOperator>> partitions, PartitioningContext context)
{
- List<Partition<PartitionableInputOperator>> newPartitions = new ArrayList<Partition<PartitionableInputOperator>>(3);
+ List<Partition<PartitionableInputOperator>> newPartitions = new ArrayList<>(3);
Iterator<? extends Partition<PartitionableInputOperator>> iterator = partitions.iterator();
Partition<PartitionableInputOperator> templatePartition;
for (int i = 0; i < 3; i++) {
@@ -401,7 +401,7 @@ public class PartitioningTest
op.partitionProperty = templatePartition.getPartitionedInstance().partitionProperty;
}
op.partitionProperty += "_" + i;
- newPartitions.add(new DefaultPartition<PartitionableInputOperator>(op));
+ newPartitions.add(new DefaultPartition<>(op));
}
return newPartitions;
}
@@ -431,7 +431,7 @@ public class PartitioningTest
lc.runAsync();
List<PTOperator> partitions = assertNumberPartitions(3, lc, dag.getMeta(input));
- Set<String> partProperties = new HashSet<String>();
+ Set<String> partProperties = new HashSet<>();
for (PTOperator p : partitions) {
LocalStreamingContainer c = StramTestSupport.waitForActivation(lc, p);
Map<Integer, Node<?>> nodeMap = c.getNodes();
@@ -460,7 +460,7 @@ public class PartitioningTest
PartitionLoadWatch.remove(partitions.get(0));
partitions = assertNumberPartitions(3, lc, dag.getMeta(input));
- partProperties = new HashSet<String>();
+ partProperties = new HashSet<>();
for (PTOperator p: partitions) {
LocalStreamingContainer c = StramTestSupport.waitForActivation(lc, p);
Map<Integer, Node<?>> nodeMap = c.getNodes();
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index 35bb363..cee8247 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -1007,7 +1007,7 @@ public class StreamCodecTest
lastId = assignNewContainers(dnm, lastId);
List<PTOperator> operators = plan.getOperators(n2meta);
- List<PTOperator> upstreamOperators = new ArrayList<PTOperator>();
+ List<PTOperator> upstreamOperators = new ArrayList<>();
for (PTOperator operator : operators) {
upstreamOperators.addAll(operator.upstreamMerge.values());
/*
@@ -1036,7 +1036,7 @@ public class StreamCodecTest
lastId = assignNewContainers(dnm, lastId);
List<PTOperator> operators = plan.getOperators(n3meta);
- List<PTOperator> upstreamOperators = new ArrayList<PTOperator>();
+ List<PTOperator> upstreamOperators = new ArrayList<>();
for (PTOperator operator : operators) {
upstreamOperators.addAll(operator.upstreamMerge.values());
}
@@ -1063,7 +1063,7 @@ public class StreamCodecTest
lastId = assignNewContainers(dnm, lastId);
List<PTOperator> operators = plan.getOperators(n2meta);
- List<PTOperator> upstreamOperators = new ArrayList<PTOperator>();
+ List<PTOperator> upstreamOperators = new ArrayList<>();
for (PTOperator operator : operators) {
upstreamOperators.addAll(operator.upstreamMerge.values());
/*
@@ -1144,7 +1144,7 @@ public class StreamCodecTest
private Set<PTOperator> getUnifiers(PhysicalPlan plan)
{
- Set<PTOperator> unifiers = new HashSet<PTOperator>();
+ Set<PTOperator> unifiers = new HashSet<>();
for (PTContainer container : plan.getContainers()) {
for (PTOperator operator : container.getOperators()) {
if (operator.isUnifier()) {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index c606f47..cb2d760 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -124,14 +124,14 @@ public class StreamingContainerManagerTest
input.portName = "inputPortNameOnNode";
input.sourceNodeId = 99;
- ndi.inputs = new ArrayList<OperatorDeployInfo.InputDeployInfo>();
+ ndi.inputs = new ArrayList<>();
ndi.inputs.add(input);
OperatorDeployInfo.OutputDeployInfo output = new OperatorDeployInfo.OutputDeployInfo();
output.declaredStreamId = "streamFromNode";
output.portName = "outputPortNameOnNode";
- ndi.outputs = new ArrayList<OperatorDeployInfo.OutputDeployInfo>();
+ ndi.outputs = new ArrayList<>();
ndi.outputs.add(output);
ContainerHeartbeatResponse scc = new ContainerHeartbeatResponse();
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java
index f6b7277..59f9dcc 100644
--- a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java
@@ -34,7 +34,7 @@ public class ApexCliMiscTest
{
ApexCli cli;
- static Map<String, String> env = new HashMap<String, String>();
+ static Map<String, String> env = new HashMap<>();
static String userHome;
@Before
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
index 2ac1c50..f1356df 100644
--- a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
@@ -59,7 +59,7 @@ public class ApexCliTest
static TemporaryFolder testFolder = new TemporaryFolder();
ApexCli cli;
- static Map<String, String> env = new HashMap<String, String>();
+ static Map<String, String> env = new HashMap<>();
static String userHome;
@BeforeClass
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
index d1a18ae..26aced8 100644
--- a/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
@@ -112,8 +112,8 @@ public class DefaultStatefulStreamCodecTest
@Test
public void testString()
{
- StatefulStreamCodec<Object> coder = new DefaultStatefulStreamCodec<Object>();
- StatefulStreamCodec<Object> decoder = new DefaultStatefulStreamCodec<Object>();
+ StatefulStreamCodec<Object> coder = new DefaultStatefulStreamCodec<>();
+ StatefulStreamCodec<Object> decoder = new DefaultStatefulStreamCodec<>();
String hello = "hello";
@@ -182,7 +182,7 @@ public class DefaultStatefulStreamCodecTest
public void testFinalFieldSerialization() throws Exception
{
TestTuple t1 = new TestTuple(5);
- DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<Object>();
+ DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<>();
DataStatePair dsp = c.toDataStatePair(t1);
TestTuple t2 = (TestTuple)c.fromDataStatePair(dsp);
Assert.assertEquals("", t1.finalField, t2.finalField);
@@ -208,7 +208,7 @@ public class DefaultStatefulStreamCodecTest
Object inner = outer.new InnerClass();
for (Object o: new Object[] {outer, inner}) {
- DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<Object>();
+ DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<>();
DataStatePair dsp = c.toDataStatePair(o);
c.fromDataStatePair(dsp);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
index cc777f7..64298de 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
@@ -77,7 +77,7 @@ public class AtMostOnceTest extends ProcessingModeTests
@Override
public void testNonLinearOperatorRecovery() throws InterruptedException
{
- final HashSet<Object> collection = new HashSet<Object>();
+ final HashSet<Object> collection = new HashSet<>();
com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap map = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
map.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 0);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index da5c7b7..66f1b84 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -197,7 +197,7 @@ public class GenericNodeTest
};
@OutputPortFieldAnnotation( optional = true)
- DefaultOutputPort<Object> op = new DefaultOutputPort<Object>();
+ DefaultOutputPort<Object> op = new DefaultOutputPort<>();
@Override
public void beginWindow(long windowId)
@@ -226,7 +226,7 @@ public class GenericNodeTest
public static class CheckpointDistanceOperator extends GenericOperator
{
- List<Integer> distances = new ArrayList<Integer>();
+ List<Integer> distances = new ArrayList<>();
int numWindows = 0;
int maxWindows = 0;
@@ -245,7 +245,7 @@ public class GenericNodeTest
public void testSynchingLogic() throws InterruptedException
{
long sleeptime = 25L;
- final ArrayList<Object> list = new ArrayList<Object>();
+ final ArrayList<Object> list = new ArrayList<>();
GenericOperator go = new GenericOperator();
final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
new DefaultAttributeMap(), null));
@@ -376,8 +376,8 @@ public class GenericNodeTest
final Server bufferServer = new Server(eventloop, 0); // find random port
final int bufferServerPort = bufferServer.run().getPort();
- final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>();
- final BlockingQueue<Object> tuples = new ArrayBlockingQueue<Object>(10);
+ final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<>();
+ final BlockingQueue<Object> tuples = new ArrayBlockingQueue<>(10);
GenericTestOperator go = new GenericTestOperator();
final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
@@ -905,7 +905,7 @@ public class GenericNodeTest
CheckpointDistanceOperator go = new CheckpointDistanceOperator();
go.maxWindows = maxWindows;
- List<Integer> checkpoints = new ArrayList<Integer>();
+ List<Integer> checkpoints = new ArrayList<>();
int window = 0;
while (window < maxWindows) {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
index 84217eb..bb2e72f 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
@@ -51,10 +51,10 @@ public class InputOperatorTest
public static class EvenOddIntegerGeneratorInputOperator implements InputOperator, com.datatorrent.api.Operator.ActivationListener<OperatorContext>
{
- public final transient DefaultOutputPort<Integer> even = new DefaultOutputPort<Integer>();
- public final transient DefaultOutputPort<Integer> odd = new DefaultOutputPort<Integer>();
- private final transient CircularBuffer<Integer> evenBuffer = new CircularBuffer<Integer>(1024);
- private final transient CircularBuffer<Integer> oddBuffer = new CircularBuffer<Integer>(1024);
+ public final transient DefaultOutputPort<Integer> even = new DefaultOutputPort<>();
+ public final transient DefaultOutputPort<Integer> odd = new DefaultOutputPort<>();
+ private final transient CircularBuffer<Integer> evenBuffer = new CircularBuffer<>(1024);
+ private final transient CircularBuffer<Integer> oddBuffer = new CircularBuffer<>(1024);
private volatile Thread dataGeneratorThread;
@Override
@@ -179,7 +179,7 @@ public class InputOperatorTest
public void setConnected(boolean flag)
{
if (flag) {
- collections.put(id, list = new ArrayList<T>());
+ collections.put(id, list = new ArrayList<>());
}
}
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
index 55b5eab..f669832 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
@@ -171,7 +171,7 @@ public class NodeTest
}
- static final ArrayList<Call> calls = new ArrayList<Call>();
+ static final ArrayList<Call> calls = new ArrayList<>();
@Override
public void save(Object object, int operatorId, long windowId) throws IOException