You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2016/01/10 18:58:16 UTC
[1/2] incubator-apex-core git commit: APEXCORE-268 #resolve #comment
removed style violations from common
Repository: incubator-apex-core
Updated Branches:
refs/heads/devel-3 b14583c4d -> aded30c45
APEXCORE-268 #resolve #comment removed style violations from common
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/778436f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/778436f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/778436f7
Branch: refs/heads/devel-3
Commit: 778436f76a21f655d844c18b7a2956115c3853a9
Parents: 7629be8
Author: MalharJenkins <je...@datatorrent.com>
Authored: Fri Jan 8 13:49:38 2016 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Fri Jan 8 13:49:59 2016 -0800
----------------------------------------------------------------------
common/pom.xml | 2 +-
.../common/codec/JsonStreamCodec.java | 13 +++--
.../common/experimental/AppData.java | 14 ++++--
.../common/metric/MetricsAggregator.java | 12 ++---
.../common/metric/SingleMetricAggregator.java | 1 +
.../partitioner/StatelessPartitioner.java | 51 +++++++++-----------
.../auth/callback/DefaultCallbackHandler.java | 10 ++--
.../common/util/AsyncFSStorageAgent.java | 20 ++++++--
.../datatorrent/common/util/BaseOperator.java | 2 -
.../util/BasicContainerOptConfigurator.java | 22 ++++-----
.../datatorrent/common/util/FSStorageAgent.java | 42 ++++++++--------
.../util/JacksonObjectMapperProvider.java | 6 ++-
.../common/util/NameableThreadFactory.java | 4 +-
.../datatorrent/common/util/PubSubMessage.java | 3 +-
.../common/util/PubSubMessageCodec.java | 9 ++--
.../common/util/PubSubWebSocketClient.java | 50 ++++++++++---------
.../common/util/SerializableObject.java | 41 ++++------------
.../common/codec/JsonStreamCodecTest.java | 4 +-
.../partitioner/StatelessPartitionerTest.java | 14 ++++--
.../common/util/AsyncFSStorageAgentTest.java | 13 ++---
.../common/util/FSStorageAgentTest.java | 19 ++++----
.../common/util/SerializableObjectTest.java | 5 +-
22 files changed, 180 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 94f317f..5c2c98f 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -56,7 +56,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
- <maxAllowedViolations>114</maxAllowedViolations>
+ <logViolationsToConsole>true</logViolationsToConsole>
</configuration>
</plugin>
</plugins>
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/codec/JsonStreamCodec.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/codec/JsonStreamCodec.java b/common/src/main/java/com/datatorrent/common/codec/JsonStreamCodec.java
index 1c3240b..a17023f 100644
--- a/common/src/main/java/com/datatorrent/common/codec/JsonStreamCodec.java
+++ b/common/src/main/java/com/datatorrent/common/codec/JsonStreamCodec.java
@@ -18,7 +18,9 @@
*/
package com.datatorrent.common.codec;
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.util.Map;
import org.codehaus.jackson.JsonGenerator;
@@ -65,8 +67,7 @@ public class JsonStreamCodec<T> implements StreamCodec<T>
}
});
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
logger.error("Caught exception when instantiating codec for class {}", entry.getKey().getName(), ex);
}
}
@@ -80,8 +81,7 @@ public class JsonStreamCodec<T> implements StreamCodec<T>
ByteArrayInputStream bis = new ByteArrayInputStream(data.buffer, data.offset, data.length);
try {
return mapper.readValue(bis, Object.class);
- }
- catch (Exception ioe) {
+ } catch (Exception ioe) {
throw new RuntimeException(ioe);
}
}
@@ -95,8 +95,7 @@ public class JsonStreamCodec<T> implements StreamCodec<T>
mapper.writeValue(bos, o);
byte[] bytes = bos.toByteArray();
return new Slice(bytes, 0, bytes.length);
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/experimental/AppData.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/experimental/AppData.java b/common/src/main/java/com/datatorrent/common/experimental/AppData.java
index 7c2a56a..bbf9753 100644
--- a/common/src/main/java/com/datatorrent/common/experimental/AppData.java
+++ b/common/src/main/java/com/datatorrent/common/experimental/AppData.java
@@ -18,17 +18,19 @@
*/
package com.datatorrent.common.experimental;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+
import org.apache.hadoop.classification.InterfaceStability;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+
/**
* Interface for App Data support. Experimental only. This interface will likely change in the near future.
*
@@ -96,6 +98,7 @@ public interface AppData
* @return The connection url used by the AppData Query or Result operator.
*/
public String getAppDataURL();
+
/**
* Returns the topic that the appdata Query or Result operator sends data to.
* @return The topic that the appdata Query or Result operator sends data to.
@@ -110,7 +113,10 @@ public interface AppData
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Inherited
- public @interface AppendQueryIdToTopic{boolean value() default false;}
+ public @interface AppendQueryIdToTopic
+ {
+ boolean value() default false;
+ }
/**
* Marker annotation for specifying appdata query ports.
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/metric/MetricsAggregator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/metric/MetricsAggregator.java b/common/src/main/java/com/datatorrent/common/metric/MetricsAggregator.java
index 4192354..e34f74a 100644
--- a/common/src/main/java/com/datatorrent/common/metric/MetricsAggregator.java
+++ b/common/src/main/java/com/datatorrent/common/metric/MetricsAggregator.java
@@ -98,8 +98,7 @@ public class MetricsAggregator implements AutoMetric.Aggregator, Serializable
String aggregatorDesc;
if (aggregatorName == null) {
aggregatorDesc = aggregator.getClass().getName();
- }
- else {
+ } else {
aggregatorDesc = aggregatorName.value();
}
return aggregatorDesc + aggregatorMetricSeparator + metric;
@@ -129,13 +128,13 @@ public class MetricsAggregator implements AutoMetric.Aggregator, Serializable
* be used for the result of aggregators[i].
*/
public void addAggregators(@NotNull String metric, @NotNull SingleMetricAggregator[] aggregators,
- @NotNull String[] logicalMetricNames)
+ @NotNull String[] logicalMetricNames)
{
Preconditions.checkNotNull(metric, "metric");
Preconditions.checkNotNull(aggregators, "aggregators");
Preconditions.checkNotNull(logicalMetricNames, "logicalMetricNames");
- Preconditions.checkArgument(aggregators.length == logicalMetricNames.length, "different length aggregators and" +
- " logical names");
+ Preconditions.checkArgument(aggregators.length == logicalMetricNames.length,
+ "different length aggregators and logical names");
addAggregatorsHelper(metric, aggregators, logicalMetricNames);
}
@@ -149,8 +148,7 @@ public class MetricsAggregator implements AutoMetric.Aggregator, Serializable
for (int i = 0; i < aggregators.length; i++) {
String resultName = (logicalMetricNames == null || logicalMetricNames[i] == null) ?
- (aggregators.length == 1 ? metric : deriveLogicalMetricName(metric, aggregators[i]))
- : logicalMetricNames[i];
+ (aggregators.length == 1 ? metric : deriveLogicalMetricName(metric, aggregators[i])) : logicalMetricNames[i];
laggregators.add(new LogicalMetricMeta(aggregators[i], resultName));
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/metric/SingleMetricAggregator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/metric/SingleMetricAggregator.java b/common/src/main/java/com/datatorrent/common/metric/SingleMetricAggregator.java
index 9b568d2..dbc9b8a 100644
--- a/common/src/main/java/com/datatorrent/common/metric/SingleMetricAggregator.java
+++ b/common/src/main/java/com/datatorrent/common/metric/SingleMetricAggregator.java
@@ -19,6 +19,7 @@
package com.datatorrent.common.metric;
import java.util.Collection;
+
/**
* <p>SingleMetricAggregator interface.</p>
*
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java b/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
index c36dd8a..165d8cf 100644
--- a/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
+++ b/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
@@ -19,16 +19,22 @@
package com.datatorrent.common.partitioner;
import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import javax.validation.constraints.Min;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.InputPort;
@@ -100,7 +106,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
logger.debug("define partitions, partitionCount current {} requested {}", partitions.size(), newPartitionCount);
//Get a partition
- DefaultPartition<T> partition = (DefaultPartition<T>) partitions.iterator().next();
+ DefaultPartition<T> partition = (DefaultPartition<T>)partitions.iterator().next();
Collection<Partition<T>> newPartitions;
if (partitions.iterator().next().getStats() == null) {
@@ -117,16 +123,13 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
if (inputPortList != null && !inputPortList.isEmpty()) {
DefaultPartition.assignPartitionKeys(newPartitions, inputPortList.iterator().next());
}
- }
- else {
+ } else {
// define partitions is being called again
if (context.getParallelPartitionCount() != 0) {
newPartitions = repartitionParallel(partitions, context);
- }
- else if (partition.getPartitionKeys().isEmpty()) {
+ } else if (partition.getPartitionKeys().isEmpty()) {
newPartitions = repartitionInputOperator(partitions);
- }
- else {
+ } else {
newPartitions = repartition(partitions);
}
}
@@ -166,8 +169,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
Partition<T> siblingPartition = lowLoadPartitions.remove(partitionKey & reducedMask);
if (siblingPartition == null) {
lowLoadPartitions.put(partitionKey & reducedMask, p);
- }
- else {
+ } else {
// both of the partitions are low load, combine
PartitionKeys newPks = new PartitionKeys(reducedMask, Sets.newHashSet(partitionKey & reducedMask));
// put new value so the map gets marked as modified
@@ -178,8 +180,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
//LOG.debug("partition keys after merge {}", siblingPartition.getPartitionKeys());
}
}
- }
- else if (load > 0) {
+ } else if (load > 0) {
// split bottlenecks
Map<InputPort<?>, PartitionKeys> keys = p.getPartitionKeys();
Map.Entry<InputPort<?>, PartitionKeys> e = keys.entrySet().iterator().next();
@@ -193,8 +194,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
int key = e.getValue().partitions.iterator().next();
int key2 = (newMask ^ e.getValue().mask) | key;
newKeys = Sets.newHashSet(key, key2);
- }
- else {
+ } else {
// assign keys to separate partitions
newMask = e.getValue().mask;
newKeys = e.getValue().partitions;
@@ -205,8 +205,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
newPartition.getPartitionKeys().put(e.getKey(), new PartitionKeys(newMask, Sets.newHashSet(key)));
newPartitions.add(newPartition);
}
- }
- else {
+ } else {
// leave unchanged
newPartitions.add(p);
}
@@ -232,16 +231,13 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
if (load < 0) {
if (!lowLoadPartitions.isEmpty()) {
newPartitions.add(lowLoadPartitions.remove(0));
- }
- else {
+ } else {
lowLoadPartitions.add(p);
}
- }
- else if (load > 0) {
+ } else if (load > 0) {
newPartitions.add(new DefaultPartition<T>(p.getPartitionedInstance()));
newPartitions.add(new DefaultPartition<T>(p.getPartitionedInstance()));
- }
- else {
+ } else {
newPartitions.add(p);
}
}
@@ -259,7 +255,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
* @return new adjusted partitions
*/
public static <T extends Operator> Collection<Partition<T>> repartitionParallel(Collection<Partition<T>> partitions,
- PartitioningContext context)
+ PartitioningContext context)
{
List<Partition<T>> newPartitions = Lists.newArrayList();
newPartitions.addAll(partitions);
@@ -273,8 +269,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>,
partitionIterator.next();
partitionIterator.remove();
}
- }
- else {
+ } else {
//Add more partitions
T anOperator = newPartitions.iterator().next().getPartitionedInstance();
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/security/auth/callback/DefaultCallbackHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/security/auth/callback/DefaultCallbackHandler.java b/common/src/main/java/com/datatorrent/common/security/auth/callback/DefaultCallbackHandler.java
index 5792ad5..779a156 100644
--- a/common/src/main/java/com/datatorrent/common/security/auth/callback/DefaultCallbackHandler.java
+++ b/common/src/main/java/com/datatorrent/common/security/auth/callback/DefaultCallbackHandler.java
@@ -20,14 +20,18 @@ package com.datatorrent.common.security.auth.callback;
import java.io.IOException;
-import javax.security.auth.callback.*;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.TextOutputCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.RealmCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.Component;
-
import com.datatorrent.common.security.SecurityContext;
/**
@@ -78,7 +82,7 @@ public class DefaultCallbackHandler implements CallbackHandler, Component<Securi
PasswordCallback passcb = (PasswordCallback)callback;
passcb.setPassword(context.getValue(SecurityContext.PASSWORD));
} else if (callback instanceof RealmCallback) {
- RealmCallback realmcb = (RealmCallback) callback;
+ RealmCallback realmcb = (RealmCallback)callback;
realmcb.setText(context.getValue(SecurityContext.REALM));
} else if (callback instanceof TextOutputCallback) {
TextOutputCallback textcb = (TextOutputCallback)callback;
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index 83bbdca..788a68c 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -18,16 +18,26 @@
*/
package com.datatorrent.common.util;
-import java.io.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamException;
import java.nio.file.Files;
import java.util.EnumSet;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
import com.datatorrent.netlet.util.DTThrowable;
/**
@@ -74,7 +84,7 @@ public class AsyncFSStorageAgent extends FSStorageAgent
@Override
public void save(final Object object, final int operatorId, final long windowId) throws IOException
{
- if(syncCheckpoint){
+ if (syncCheckpoint) {
super.save(object, operatorId, windowId);
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/util/BaseOperator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/BaseOperator.java b/common/src/main/java/com/datatorrent/common/util/BaseOperator.java
index 5b00e44..4601f3a 100644
--- a/common/src/main/java/com/datatorrent/common/util/BaseOperator.java
+++ b/common/src/main/java/com/datatorrent/common/util/BaseOperator.java
@@ -21,8 +21,6 @@ package com.datatorrent.common.util;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Operator;
-import com.datatorrent.common.util.SerializableObject;
-
/**
* Base class for operator implementations that provides empty implementations
* for all interface methods.
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/util/BasicContainerOptConfigurator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/BasicContainerOptConfigurator.java b/common/src/main/java/com/datatorrent/common/util/BasicContainerOptConfigurator.java
index 328ec7b..86ce23d 100644
--- a/common/src/main/java/com/datatorrent/common/util/BasicContainerOptConfigurator.java
+++ b/common/src/main/java/com/datatorrent/common/util/BasicContainerOptConfigurator.java
@@ -63,11 +63,10 @@ public class BasicContainerOptConfigurator implements Context.ContainerOptConfig
for (DAG.OperatorMeta operatorMeta : operatorMetaList) {
Map<String, Object> operatorMap = parseJvmOpts(operatorMeta.getValue(Context.OperatorContext.JVM_OPTIONS), operatorMeta.getValue(Context.OperatorContext.MEMORY_MB));
LOG.info("property map for operator {}", operatorMap);
- Set<String> operatorPropertySet = (Set<String>) operatorMap.get(GENERIC);
+ Set<String> operatorPropertySet = (Set<String>)operatorMap.get(GENERIC);
if (genericProperties == null) {
genericProperties = operatorPropertySet;
- }
- else {
+ } else {
if (operatorPropertySet != null && !genericProperties.equals(operatorPropertySet)) {
throw new AssertionError("Properties don't match: " + genericProperties + " " + operatorPropertySet);
}
@@ -77,15 +76,15 @@ public class BasicContainerOptConfigurator implements Context.ContainerOptConfig
for (Map<String, Object> map : jvmOptsList) {
String value;
if (map.containsKey(XMX)) {
- value = (String) map.get(XMX);
+ value = (String)map.get(XMX);
xmx += getOptValue(value);
}
if (map.containsKey(XMS)) {
- value = (String) map.get(XMS);
+ value = (String)map.get(XMS);
xms += getOptValue(value);
}
if (map.containsKey(XSS)) {
- value = (String) map.get(XSS);
+ value = (String)map.get(XSS);
xss += getOptValue(value);
}
}
@@ -110,14 +109,11 @@ public class BasicContainerOptConfigurator implements Context.ContainerOptConfig
long result;
if (value.endsWith("g") || value.endsWith("G")) {
result = Long.valueOf(value.substring(0, value.length() - 1)) * GB_TO_B;
- }
- else if (value.endsWith("m") || value.endsWith("M")) {
+ } else if (value.endsWith("m") || value.endsWith("M")) {
result = Long.valueOf(value.substring(0, value.length() - 1)) * MB_TO_B;
- }
- else if (value.endsWith("k") || value.endsWith("K")) {
+ } else if (value.endsWith("k") || value.endsWith("K")) {
result = Long.valueOf(value.substring(0, value.length() - 1)) * KB_TO_B;
- }
- else {
+ } else {
result = Long.valueOf(value);
}
return result;
@@ -151,7 +147,7 @@ public class BasicContainerOptConfigurator implements Context.ContainerOptConfig
int memoryOverhead = memory / 4;
int heapSize = memory - memoryOverhead;
if (memoryOverhead > 1024) {
- heapSize = memory - 1024;
+ heapSize = memory - 1024;
}
map.put(XMX, heapSize + "m");
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
index c2f68a0..fd4c450 100644
--- a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
@@ -18,16 +18,28 @@
*/
package com.datatorrent.common.util;
-import java.io.*;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamException;
+import java.io.OutputStream;
+import java.io.Serializable;
import java.net.URI;
import java.util.EnumSet;
import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
@@ -35,7 +47,6 @@ import com.google.common.collect.Lists;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.api.annotation.Stateless;
-
import com.datatorrent.netlet.util.DTThrowable;
/**
@@ -71,12 +82,10 @@ public class FSStorageAgent implements StorageAgent, Serializable
if (pathUri.getScheme() != null) {
fileContext = FileContext.getFileContext(pathUri, conf == null ? new Configuration() : conf);
- }
- else {
+ } else {
fileContext = FileContext.getFileContext(conf == null ? new Configuration() : conf);
}
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
}
@@ -95,27 +104,23 @@ public class FSStorageAgent implements StorageAgent, Serializable
Options.CreateOpts.CreateParent.createParent());
store(stream, object);
stateSaved = true;
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
logger.debug("while saving {} {}", operatorId, window, t);
stateSaved = false;
DTThrowable.rethrow(t);
- }
- finally {
+ } finally {
try {
if (stream != null) {
stream.close();
}
- }
- catch (IOException ie) {
+ } catch (IOException ie) {
stateSaved = false;
throw new RuntimeException(ie);
- }
- finally {
+ } finally {
if (stateSaved) {
logger.debug("Saving {}: {}", operatorId, window);
fileContext.rename(lPath, new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + window),
- Options.Rename.OVERWRITE);
+ Options.Rename.OVERWRITE);
}
}
}
@@ -130,8 +135,7 @@ public class FSStorageAgent implements StorageAgent, Serializable
FSDataInputStream stream = fileContext.open(lPath);
try {
return retrieve(stream);
- }
- finally {
+ } finally {
stream.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java b/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
index 2f1735f..7723fed 100644
--- a/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
+++ b/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
@@ -19,13 +19,17 @@
package com.datatorrent.common.util;
import java.io.IOException;
+
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.ext.ContextResolver;
import javax.ws.rs.ext.Provider;
+
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.Version;
-import org.codehaus.jackson.map.*;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
import org.codehaus.jackson.map.module.SimpleModule;
import org.codehaus.jackson.map.ser.std.RawSerializer;
import org.codehaus.jettison.json.JSONArray;
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/util/NameableThreadFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/NameableThreadFactory.java b/common/src/main/java/com/datatorrent/common/util/NameableThreadFactory.java
index f3aa170..f0c61b5 100644
--- a/common/src/main/java/com/datatorrent/common/util/NameableThreadFactory.java
+++ b/common/src/main/java/com/datatorrent/common/util/NameableThreadFactory.java
@@ -54,9 +54,7 @@ public class NameableThreadFactory implements ThreadFactory
@Override
public Thread newThread(Runnable r)
{
- Thread t = new Thread(group, r,
- namePrefix + threadNumber.getAndIncrement(),
- 0);
+ Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon() != this.isDaemon) {
t.setDaemon(isDaemon);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/util/PubSubMessage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/PubSubMessage.java b/common/src/main/java/com/datatorrent/common/util/PubSubMessage.java
index 1704edb..f263d0c 100644
--- a/common/src/main/java/com/datatorrent/common/util/PubSubMessage.java
+++ b/common/src/main/java/com/datatorrent/common/util/PubSubMessage.java
@@ -53,7 +53,8 @@ public class PubSubMessage<T>
return identifier;
}
- public static PubSubMessageType getPubSubMessageType(String identifier) {
+ public static PubSubMessageType getPubSubMessageType(String identifier)
+ {
PubSubMessageType pubSubMessageType = null;
for (PubSubMessageType value : PubSubMessageType.values()) {
if (value.getIdentifier().equals(identifier)) {
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java b/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
index aa84213..926fa8f 100644
--- a/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
+++ b/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
@@ -36,11 +36,13 @@ public class PubSubMessageCodec<T>
private final ObjectMapper mapper;
- public PubSubMessageCodec(ObjectMapper mapper) {
+ public PubSubMessageCodec(ObjectMapper mapper)
+ {
this.mapper = mapper;
}
- public String formatMessage(PubSubMessage<T> pubSubMessage) throws IOException {
+ public String formatMessage(PubSubMessage<T> pubSubMessage) throws IOException
+ {
HashMap<String, Object> map = new HashMap<String, Object>();
map.put(PubSubMessage.TYPE_KEY, pubSubMessage.getType().getIdentifier());
map.put(PubSubMessage.TOPIC_KEY, pubSubMessage.getTopic());
@@ -59,7 +61,8 @@ public class PubSubMessageCodec<T>
* @throws IOException
*/
@SuppressWarnings({"unchecked"})
- public PubSubMessage<T> parseMessage(String message) throws IOException {
+ public PubSubMessage<T> parseMessage(String message) throws IOException
+ {
HashMap<String, Object> map = mapper.readValue(message, HashMap.class);
PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
pubSubMessage.setType(PubSubMessageType.getPubSubMessageType((String)map.get(PubSubMessage.TYPE_KEY)));
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/util/PubSubWebSocketClient.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/PubSubWebSocketClient.java b/common/src/main/java/com/datatorrent/common/util/PubSubWebSocketClient.java
index 58072ee..c3f5961 100644
--- a/common/src/main/java/com/datatorrent/common/util/PubSubWebSocketClient.java
+++ b/common/src/main/java/com/datatorrent/common/util/PubSubWebSocketClient.java
@@ -21,13 +21,12 @@ package com.datatorrent.common.util;
import java.io.IOException;
import java.net.URI;
import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
-import com.ning.http.client.*;
-import com.ning.http.client.AsyncHttpClient.BoundRequestBuilder;
-import com.ning.http.client.websocket.*;
-
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
@@ -36,11 +35,19 @@ import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datatorrent.common.util.PubSubMessage.PubSubMessageType;
+import com.ning.http.client.AsyncCompletionHandler;
+import com.ning.http.client.AsyncHttpClient;
+import com.ning.http.client.AsyncHttpClient.BoundRequestBuilder;
+import com.ning.http.client.AsyncHttpClientConfigBean;
+import com.ning.http.client.Cookie;
+import com.ning.http.client.Response;
+import com.ning.http.client.websocket.WebSocket;
+import com.ning.http.client.websocket.WebSocketTextListener;
+import com.ning.http.client.websocket.WebSocketUpgradeHandler;
import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
-
+import com.datatorrent.common.util.PubSubMessage.PubSubMessageType;
import com.datatorrent.netlet.util.DTThrowable;
/**
@@ -71,14 +78,11 @@ public abstract class PubSubWebSocketClient implements Component<Context>
try {
pubSubMessage = codec.parseMessage(message);
PubSubWebSocketClient.this.onMessage(pubSubMessage.getType().getIdentifier(), pubSubMessage.getTopic(), pubSubMessage.getData());
- }
- catch (JsonParseException jpe) {
+ } catch (JsonParseException jpe) {
logger.warn("Ignoring unparseable JSON message: {}", message, jpe);
- }
- catch (JsonMappingException jme) {
+ } catch (JsonMappingException jme) {
logger.warn("Ignoring JSON mapping in message: {}", message, jme);
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
onError(ex);
}
}
@@ -175,8 +179,7 @@ public abstract class PubSubWebSocketClient implements Component<Context>
try {
json.put("userName", userName);
json.put("password", password);
- }
- catch (JSONException ex) {
+ } catch (JSONException ex) {
throw new RuntimeException(ex);
}
Response response = client.preparePost(loginUrl).setHeader("Content-Type", "application/json").setBody(json.toString()).execute().get();
@@ -201,8 +204,7 @@ public abstract class PubSubWebSocketClient implements Component<Context>
try {
json.put("userName", userName);
json.put("password", password);
- }
- catch (JSONException ex) {
+ } catch (JSONException ex) {
throw new RuntimeException(ex);
}
client.preparePost(loginUrl).setHeader("Content-Type", "application/json").setBody(json.toString()).execute(new AsyncCompletionHandler<Response>()
@@ -223,9 +225,8 @@ public abstract class PubSubWebSocketClient implements Component<Context>
}
});
- }
- else {
- client.prepareGet(uri.toString()).execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new PubSubWebSocket()
+ } else {
+ final PubSubWebSocket webSocket = new PubSubWebSocket()
{
@Override
public void onOpen(WebSocket ws)
@@ -233,8 +234,9 @@ public abstract class PubSubWebSocketClient implements Component<Context>
connection = ws;
super.onOpen(ws);
}
-
- }).build());
+ };
+ client.prepareGet(uri.toString()).execute(
+ new WebSocketUpgradeHandler.Builder().addWebSocketListener(webSocket).build());
}
}
@@ -289,11 +291,11 @@ public abstract class PubSubWebSocketClient implements Component<Context>
Throwable t = throwable.get();
if (t instanceof IOException) {
throw (IOException)t;
- }
- else {
+ } else {
DTThrowable.rethrow(t);
}
}
+
/**
* <p>publish.</p>
*
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/main/java/com/datatorrent/common/util/SerializableObject.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/SerializableObject.java b/common/src/main/java/com/datatorrent/common/util/SerializableObject.java
index 203399f..caed968 100644
--- a/common/src/main/java/com/datatorrent/common/util/SerializableObject.java
+++ b/common/src/main/java/com/datatorrent/common/util/SerializableObject.java
@@ -22,8 +22,8 @@ import java.io.ObjectStreamException;
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Modifier;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,37 +58,21 @@ public class SerializableObject implements Serializable
Constructor<? extends SerializableObject> constructor = this.getClass().getConstructor(this.getClass());
try {
constructor.setAccessible(true);
- }
- catch (SecurityException ex) {
+ } catch (SecurityException ex) {
logger.warn("Accessing copy constructor {} failed.", constructor, ex);
}
try {
return constructor.newInstance(this);
- }
- catch (InstantiationException ex) {
- throw new RuntimeException("Instantiation using copy constructor failed!", ex);
- }
- catch (IllegalAccessException ex) {
+ } catch (ReflectiveOperationException | IllegalArgumentException ex) {
throw new RuntimeException("Instantiation using copy constructor failed!", ex);
}
- catch (IllegalArgumentException ex) {
- throw new RuntimeException("Instantiation using copy constructor failed!", ex);
- }
- catch (InvocationTargetException ex) {
- throw new RuntimeException("Instantiation using copy constructor failed!", ex);
- }
- }
- catch (NoSuchMethodException snme) {
+ } catch (NoSuchMethodException snme) {
logger.debug("No copy constructor detected for class {}, trying default constructor.", this.getClass().getSimpleName());
try {
SerializableObject newInstance = this.getClass().newInstance();
transferStateTo(newInstance);
return newInstance;
- }
- catch (IllegalAccessException ex) {
- throw new RuntimeException("Deserialization using default constructor failed!", ex);
- }
- catch (InstantiationException ex) {
+ } catch (ReflectiveOperationException ex) {
throw new RuntimeException("Deserialization using default constructor failed!", ex);
}
}
@@ -108,27 +92,20 @@ public class SerializableObject implements Serializable
if (!(Modifier.isFinal(modifiers) && Modifier.isTransient(modifiers) || Modifier.isStatic(modifiers))) {
try {
field.setAccessible(true);
- }
- catch (SecurityException ex) {
+ } catch (SecurityException ex) {
logger.warn("Cannot set field {} accessible.", field, ex);
}
try {
field.set(dest, field.get(this));
- }
- catch (IllegalArgumentException ex) {
+ } catch (IllegalArgumentException ex) {
throw new RuntimeException("Getter/Setter argument failed using reflection on " + field, ex);
- }
- catch (IllegalAccessException ex) {
+ } catch (IllegalAccessException ex) {
throw new RuntimeException("Getter/Setter access failed using reflection on " + field, ex);
}
if (!field.getType().isPrimitive()) {
try {
field.set(this, null);
- }
- catch (IllegalArgumentException ex) {
- logger.warn("Failed to set field {} to null; generally it's harmless, but with reference counted data structure this may be an issue.", field, ex);
- }
- catch (IllegalAccessException ex) {
+ } catch (IllegalArgumentException | IllegalAccessException ex) {
logger.warn("Failed to set field {} to null; generally it's harmless, but with reference counted data structure this may be an issue.", field, ex);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java b/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java
index d99a186..b31009e 100644
--- a/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java
+++ b/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java
@@ -18,13 +18,15 @@
*/
package com.datatorrent.common.codec;
-import com.datatorrent.api.StringCodec;
import java.util.HashMap;
import java.util.Map;
+
import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;
+import com.datatorrent.api.StringCodec;
+
public class JsonStreamCodecTest
{
static class PojoClass
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
index 25e5fcc..687957c 100644
--- a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
+++ b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
@@ -25,9 +25,13 @@ import org.junit.Assert;
import org.junit.Test;
import com.google.common.collect.Lists;
+
import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.*;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Partitioner.Partition;
import com.datatorrent.api.StringCodec.Object2String;
@@ -97,7 +101,7 @@ public class StatelessPartitionerTest
Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0));
Assert.assertEquals("Incorrect number of partitions", 1, newPartitions.size());
- for(Partition<DummyOperator> partition: newPartitions) {
+ for (Partition<DummyOperator> partition : newPartitions) {
Assert.assertEquals("Incorrect cloned value", 5, partition.getPartitionedInstance().getValue());
}
}
@@ -115,7 +119,7 @@ public class StatelessPartitionerTest
Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0));
Assert.assertEquals("Incorrect number of partitions", 5, newPartitions.size());
- for(Partition<DummyOperator> partition: newPartitions) {
+ for (Partition<DummyOperator> partition : newPartitions) {
Assert.assertEquals("Incorrect cloned value", 5, partition.getPartitionedInstance().getValue());
}
}
@@ -138,7 +142,7 @@ public class StatelessPartitionerTest
partitions.add(new DefaultPartition<DummyOperator>(dummyOperator));
Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions,
- new PartitioningContextImpl(null, 5));
+ new PartitioningContextImpl(null, 5));
Assert.assertEquals("after partition", 5, newPartitions.size());
}
@@ -155,7 +159,7 @@ public class StatelessPartitionerTest
}
Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions,
- new PartitioningContextImpl(null, 1));
+ new PartitioningContextImpl(null, 1));
Assert.assertEquals("after partition", 1, newPartitions.size());
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java b/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
index e2522cb..e644846 100644
--- a/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
+++ b/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
@@ -22,15 +22,16 @@ import java.io.File;
import java.io.IOException;
import java.util.Map;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+
import com.google.common.collect.Maps;
import com.datatorrent.api.Attribute;
@@ -85,7 +86,7 @@ public class AsyncFSStorageAgentTest
testMeta.storageAgent.save(data, 1, 1);
testMeta.storageAgent.copyToHDFS(1, 1);
@SuppressWarnings("unchecked")
- Map<Integer, String> decoded = (Map<Integer, String>) testMeta.storageAgent.load(1, 1);
+ Map<Integer, String> decoded = (Map<Integer, String>)testMeta.storageAgent.load(1, 1);
Assert.assertEquals("dataOf1", data, decoded);
}
@@ -107,10 +108,10 @@ public class AsyncFSStorageAgentTest
testMeta.storageAgent.save(dataOf2, 2, 1);
testMeta.storageAgent.copyToHDFS(2, 1);
@SuppressWarnings("unchecked")
- Map<Integer, String> decoded1 = (Map<Integer, String>) testMeta.storageAgent.load(1, 1);
+ Map<Integer, String> decoded1 = (Map<Integer, String>)testMeta.storageAgent.load(1, 1);
@SuppressWarnings("unchecked")
- Map<Integer, String> decoded2 = (Map<Integer, String>) testMeta.storageAgent.load(2, 1);
+ Map<Integer, String> decoded2 = (Map<Integer, String>)testMeta.storageAgent.load(2, 1);
Assert.assertEquals("data of 1", dataOf1, decoded1);
Assert.assertEquals("data of 2", dataOf2, decoded2);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/test/java/com/datatorrent/common/util/FSStorageAgentTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/util/FSStorageAgentTest.java b/common/src/test/java/com/datatorrent/common/util/FSStorageAgentTest.java
index e9fc1ea..0d6e38b 100644
--- a/common/src/test/java/com/datatorrent/common/util/FSStorageAgentTest.java
+++ b/common/src/test/java/com/datatorrent/common/util/FSStorageAgentTest.java
@@ -22,15 +22,16 @@ import java.io.File;
import java.io.IOException;
import java.util.Map;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+
import com.google.common.collect.Maps;
import com.datatorrent.api.Attribute;
@@ -50,8 +51,7 @@ public class FSStorageAgentTest
applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
try {
FileUtils.forceMkdir(new File("target/" + description.getClassName()));
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException(e);
}
storageAgent = new FSStorageAgent(applicationPath, null);
@@ -65,8 +65,7 @@ public class FSStorageAgentTest
{
try {
FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException(e);
}
}
@@ -84,7 +83,7 @@ public class FSStorageAgentTest
data.put(3, "three");
testMeta.storageAgent.save(data, 1, 1);
@SuppressWarnings("unchecked")
- Map<Integer, String> decoded = (Map<Integer, String>) testMeta.storageAgent.load(1, 1);
+ Map<Integer, String> decoded = (Map<Integer, String>)testMeta.storageAgent.load(1, 1);
Assert.assertEquals("dataOf1", data, decoded);
}
@@ -104,10 +103,10 @@ public class FSStorageAgentTest
testMeta.storageAgent.save(dataOf1, 1, 1);
testMeta.storageAgent.save(dataOf2, 2, 1);
@SuppressWarnings("unchecked")
- Map<Integer, String> decoded1 = (Map<Integer, String>) testMeta.storageAgent.load(1, 1);
+ Map<Integer, String> decoded1 = (Map<Integer, String>)testMeta.storageAgent.load(1, 1);
@SuppressWarnings("unchecked")
- Map<Integer, String> decoded2 = (Map<Integer, String>) testMeta.storageAgent.load(2, 1);
+ Map<Integer, String> decoded2 = (Map<Integer, String>)testMeta.storageAgent.load(2, 1);
Assert.assertEquals("data of 1", dataOf1, decoded1);
Assert.assertEquals("data of 2", dataOf2, decoded2);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/778436f7/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java b/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
index c44b953..97debe3 100644
--- a/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
+++ b/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
@@ -24,13 +24,14 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.junit.Test;
-import static org.junit.Assert.*;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator.InputPort;
import com.datatorrent.api.Operator.OutputPort;
+import static org.junit.Assert.assertEquals;
+
/**
*
*/
@@ -124,4 +125,4 @@ public class SerializableObjectTest
assertEquals("Serialized Deserialized Objects", pre, post);
}
-}
\ No newline at end of file
+}
[2/2] incubator-apex-core git commit: Merge branch 'APEXCORE-268' of
https://github.com/chandnisingh/incubator-apex-core into APEXCORE-268
Posted by vr...@apache.org.
Merge branch 'APEXCORE-268' of https://github.com/chandnisingh/incubator-apex-core into APEXCORE-268
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/aded30c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/aded30c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/aded30c4
Branch: refs/heads/devel-3
Commit: aded30c45cc7bd2dba08243c5d0536fe9fe13bb1
Parents: b14583c 778436f
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Sun Jan 10 09:52:05 2016 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Sun Jan 10 09:52:05 2016 -0800
----------------------------------------------------------------------
common/pom.xml | 2 +-
.../common/codec/JsonStreamCodec.java | 13 +++--
.../common/experimental/AppData.java | 14 ++++--
.../common/metric/MetricsAggregator.java | 12 ++---
.../common/metric/SingleMetricAggregator.java | 1 +
.../partitioner/StatelessPartitioner.java | 51 +++++++++-----------
.../auth/callback/DefaultCallbackHandler.java | 10 ++--
.../common/util/AsyncFSStorageAgent.java | 20 ++++++--
.../datatorrent/common/util/BaseOperator.java | 2 -
.../util/BasicContainerOptConfigurator.java | 22 ++++-----
.../datatorrent/common/util/FSStorageAgent.java | 42 ++++++++--------
.../util/JacksonObjectMapperProvider.java | 6 ++-
.../common/util/NameableThreadFactory.java | 4 +-
.../datatorrent/common/util/PubSubMessage.java | 3 +-
.../common/util/PubSubMessageCodec.java | 9 ++--
.../common/util/PubSubWebSocketClient.java | 50 ++++++++++---------
.../common/util/SerializableObject.java | 41 ++++------------
.../common/codec/JsonStreamCodecTest.java | 4 +-
.../partitioner/StatelessPartitionerTest.java | 14 ++++--
.../common/util/AsyncFSStorageAgentTest.java | 13 ++---
.../common/util/FSStorageAgentTest.java | 19 ++++----
.../common/util/SerializableObjectTest.java | 5 +-
22 files changed, 180 insertions(+), 177 deletions(-)
----------------------------------------------------------------------