You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by br...@apache.org on 2015/11/19 21:35:06 UTC
[3/4] incubator-apex-malhar git commit: MLHR-1912 #resolve #comment
fixed style violations
MLHR-1912 #resolve #comment fixed style violations
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b9aa203d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b9aa203d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b9aa203d
Branch: refs/heads/devel-3
Commit: b9aa203d7c411181665362dc0cee7420c7d61291
Parents: 2fe4bec
Author: Chandni Singh <cs...@apache.org>
Authored: Thu Nov 19 08:30:41 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Thu Nov 19 11:16:17 2015 -0800
----------------------------------------------------------------------
.../datatorrent/lib/algo/UniqueValueCount.java | 13 +-
.../lib/codec/KryoSerializableStreamCodec.java | 4 +-
.../AbstractDBLookupCacheBackedOperator.java | 16 ++-
.../datatorrent/lib/db/cache/CacheManager.java | 27 +++--
.../datatorrent/lib/db/cache/CacheStore.java | 5 +-
.../lib/db/jdbc/AbstractJdbcInputOperator.java | 15 +--
...stractJdbcTransactionableOutputOperator.java | 20 ++--
.../db/jdbc/JDBCLookupCacheBackedOperator.java | 15 +--
.../lib/db/jdbc/JdbcPOJOInputOperator.java | 55 ++++++---
.../lib/db/jdbc/JdbcPOJOOutputOperator.java | 98 +++++++++------
.../com/datatorrent/lib/db/jdbc/JdbcStore.java | 14 +--
.../lib/db/jdbc/JdbcTransactionalStore.java | 36 +++---
.../lib/io/AbstractFTPInputOperator.java | 14 +--
.../lib/io/fs/AbstractFileOutputOperator.java | 23 ++--
.../lib/io/jms/AbstractJMSInputOperator.java | 85 +++++++------
.../lib/io/jms/JMSStringInputOperator.java | 10 +-
.../datatorrent/lib/metric/AvgAggregator.java | 3 +-
.../lib/metric/max/DoubleMaxAggregator.java | 3 +-
.../lib/metric/max/FloatMaxAggregator.java | 3 +-
.../lib/metric/max/IntMaxAggregator.java | 3 +-
.../lib/metric/max/LongMaxAggregator.java | 1 -
.../lib/metric/min/DoubleMinAggregator.java | 3 +-
.../lib/metric/min/FloatMinAggregator.java | 3 +-
.../lib/metric/min/IntMinAggregator.java | 3 +-
.../lib/metric/min/LongMinAggregator.java | 3 +-
.../lib/codec/KryoStreamCodecTest.java | 120 ++++++++++---------
.../lib/db/cache/CacheManagerTest.java | 2 +-
.../jdbc/JDBCLookupCacheBackedOperatorTest.java | 33 ++---
.../lib/db/jdbc/JdbcOperatorTest.java | 43 ++++---
.../datatorrent/lib/db/jdbc/JdbcStoreTest.java | 7 +-
.../lib/io/FTPStringInputOperatorTest.java | 7 +-
.../io/fs/AbstractFileOutputOperatorTest.java | 4 +-
.../lib/io/jms/JMSStringInputOperatorTest.java | 39 +++---
pom.xml | 2 +-
34 files changed, 388 insertions(+), 344 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java b/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java
index 3d5ed3b..6f8750d 100644
--- a/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java
+++ b/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java
@@ -24,15 +24,13 @@ import java.util.Set;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.datatorrent.lib.util.KeyValPair;
-
-import com.datatorrent.common.util.BaseOperator;
-
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
/**
* This operator counts the number of unique values corresponding to a key within a window.
@@ -89,20 +87,21 @@ public class UniqueValueCount<K> extends BaseOperator
@SuppressWarnings({"rawtypes", "unchecked"})
public Unifier<KeyValPair<K, Integer>> getUnifier()
{
- return (Unifier) new UniqueCountUnifier<K>();
+ return (Unifier)new UniqueCountUnifier<K>();
}
};
/**
* The output port which emits key and set containing unique values
*/
- public final transient DefaultOutputPort<KeyValPair<K, Set<Object>>> outputValues = new DefaultOutputPort<KeyValPair<K, Set<Object>>>()
+ public final transient DefaultOutputPort<KeyValPair<K, Set<Object>>> outputValues =
+ new DefaultOutputPort<KeyValPair<K, Set<Object>>>()
{
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public Unifier<KeyValPair<K, Set<Object>>> getUnifier()
{
- return (Unifier) new UniqueCountSetUnifier<K>();
+ return (Unifier)new UniqueCountSetUnifier<K>();
}
};
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/codec/KryoSerializableStreamCodec.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/codec/KryoSerializableStreamCodec.java b/library/src/main/java/com/datatorrent/lib/codec/KryoSerializableStreamCodec.java
index ecfa422..99e0a6e 100644
--- a/library/src/main/java/com/datatorrent/lib/codec/KryoSerializableStreamCodec.java
+++ b/library/src/main/java/com/datatorrent/lib/codec/KryoSerializableStreamCodec.java
@@ -28,6 +28,7 @@ import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.base.Preconditions;
+
import com.datatorrent.api.StreamCodec;
import com.datatorrent.netlet.util.Slice;
@@ -56,7 +57,8 @@ public class KryoSerializableStreamCodec<T> implements StreamCodec<T>, Serializa
}
/**
- * Registers a class with kryo. If the class of the tuple and its fields are registered then kryo serialization is more efficient.
+ * Registers a class with kryo. If the class of the tuple and its fields are registered then kryo serialization is
+ * more efficient.
*
* @param clazz class to register with Kryo.
*/
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/cache/AbstractDBLookupCacheBackedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/cache/AbstractDBLookupCacheBackedOperator.java b/library/src/main/java/com/datatorrent/lib/db/cache/AbstractDBLookupCacheBackedOperator.java
index 1510adf..42e02d3 100644
--- a/library/src/main/java/com/datatorrent/lib/db/cache/AbstractDBLookupCacheBackedOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/cache/AbstractDBLookupCacheBackedOperator.java
@@ -29,7 +29,6 @@ import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
-
import com.datatorrent.lib.db.Connectable;
import com.datatorrent.lib.util.KeyValPair;
@@ -53,7 +52,8 @@ import com.datatorrent.lib.util.KeyValPair;
* @param <S> type of store
* @since 0.9.1
*/
-public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectable> implements Operator, CacheManager.Backup
+public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectable>
+ implements Operator, CacheManager.Backup
{
@NotNull
protected S store;
@@ -83,11 +83,11 @@ public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectab
Object value = cacheManager.get(key);
if (value != null) {
- output.emit(new KeyValPair<Object, Object>(key, value));
+ output.emit(new KeyValPair<>(key, value));
}
}
- public final transient DefaultOutputPort<KeyValPair<Object, Object>> output = new DefaultOutputPort<KeyValPair<Object, Object>>();
+ public final transient DefaultOutputPort<KeyValPair<Object, Object>> output = new DefaultOutputPort<>();
@Override
public void beginWindow(long l)
@@ -107,8 +107,7 @@ public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectab
cacheManager.setBackup(this);
try {
cacheManager.initialize();
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException(e);
}
}
@@ -118,8 +117,7 @@ public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectab
{
try {
cacheManager.close();
- }
- catch (IOException e) {
+ } catch (IOException e) {
LOG.error("closing manager", e);
}
}
@@ -171,6 +169,6 @@ public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectab
*/
protected abstract Object getKeyFromTuple(T tuple);
- private final static Logger LOG = LoggerFactory.getLogger(AbstractDBLookupCacheBackedOperator.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractDBLookupCacheBackedOperator.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java b/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java
index 6d9f89d..2ea31f1 100644
--- a/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java
+++ b/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java
@@ -20,7 +20,12 @@ package com.datatorrent.lib.db.cache;
import java.io.Closeable;
import java.io.IOException;
-import java.util.*;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -36,15 +41,15 @@ import com.datatorrent.lib.db.KeyValueStore;
/**
* Manages primary and secondary stores.<br/>
- * <p>
- * It firsts checks the primary store for a key. If the primary store doesn't have the key, it queries the backup store and retrieves the value.<br/>
+ * It firsts checks the primary store for a key. If the primary store doesn't have the key, it queries the backup
+ * store and retrieves the value.<br/>
* If the key was present in the backup store, its value is returned and also saved in the primary store.
- * </p>
- * <p>
- * Typically primary store is faster but has limited size like memory and backup store is slower but unlimited like databases.<br/>
- * Store Manager can also refresh the values of keys at a specified time every day. This time is in format HH:mm:ss Z.<br/>
+ * <p/>
+ * Typically primary store is faster but has limited size like memory and backup store is slower but unlimited like
+ * databases.<br/>
+ * Store Manager can also refresh the values of keys at a specified time every day.
+ * This time is in format HH:mm:ss Z.<br/>
* This is not thread-safe.
- * </p>
*
* @since 0.9.2
*/
@@ -180,7 +185,7 @@ public class CacheManager implements Closeable
/**
* A primary store should also provide setting the value for a key.
*/
- public static interface Primary extends KeyValueStore
+ public interface Primary extends KeyValueStore
{
/**
@@ -195,7 +200,7 @@ public class CacheManager implements Closeable
* Backup store is queried when {@link Primary} doesn't contain a key.<br/>
* It also provides data needed at startup.<br/>
*/
- public static interface Backup extends KeyValueStore
+ public interface Backup extends KeyValueStore
{
/**
* <br>Backup stores are also used to initialize primary stores. This fetches initialization data.</br>
@@ -206,6 +211,6 @@ public class CacheManager implements Closeable
}
@SuppressWarnings("unused")
- private final static Logger LOG = LoggerFactory.getLogger(CacheManager.class);
+ private static final Logger LOG = LoggerFactory.getLogger(CacheManager.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java b/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
index d10fe13..72063ee 100644
--- a/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
+++ b/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
@@ -114,8 +114,7 @@ public class CacheStore implements CacheManager.Primary
CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder();
if (entryExpiryStrategy == ExpiryType.EXPIRE_AFTER_ACCESS) {
cacheBuilder.expireAfterAccess(entryExpiryDurationInMillis, TimeUnit.MILLISECONDS);
- }
- else if (entryExpiryStrategy == ExpiryType.EXPIRE_AFTER_WRITE) {
+ } else if (entryExpiryStrategy == ExpiryType.EXPIRE_AFTER_WRITE) {
cacheBuilder.expireAfterWrite(entryExpiryDurationInMillis, TimeUnit.MILLISECONDS);
}
cache = cacheBuilder.build();
@@ -186,7 +185,7 @@ public class CacheStore implements CacheManager.Primary
/**
* Strategies for time-based expiration of entries.
*/
- public static enum ExpiryType
+ public enum ExpiryType
{
/**
* Only expire the entries after the specified duration has passed since the entry was last accessed by a read
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java
index 1cb7635..fe6b077 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java
@@ -35,7 +35,8 @@ import com.datatorrent.lib.db.AbstractStoreInputOperator;
* and emits the data as tuples.
* Subclasses should implement the methods required to read the data from the database.
* <p>
- * This is an abstract class. Sub-classes need to implement {@link #queryToRetrieveData()} and {@link #getTuple(ResultSet)}.
+ * This is an abstract class. Sub-classes need to implement
+ * {@link #queryToRetrieveData()} and {@link #getTuple(ResultSet)}.
* </p>
* @displayName Abstract JDBC Input
* @category Input
@@ -84,17 +85,14 @@ public abstract class AbstractJdbcInputOperator<T> extends AbstractStoreInputOpe
outputPort.emit(tuple);
}
while (result.next());
- }
- else {
+ } else {
// No rows available wait for some time before retrying so as to not continuously slam the database
Thread.sleep(waitForDataTimeout);
}
- }
- catch (SQLException ex) {
+ } catch (SQLException ex) {
store.disconnect();
throw new RuntimeException(String.format("Error while running query: %s", query), ex);
- }
- catch (InterruptedException ex) {
+ } catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
@@ -106,8 +104,7 @@ public abstract class AbstractJdbcInputOperator<T> extends AbstractStoreInputOpe
super.setup(context);
try {
queryStatement = store.getConnection().createStatement();
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException("creating query", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
index b57a9b1..77b76c1 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
@@ -22,8 +22,8 @@ import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
-import javax.annotation.Nonnull;
import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.datatorrent.api.Context;
-
import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator;
/**
@@ -40,7 +39,8 @@ import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator
* <p>
* This operator creates a transaction at the start of window, executes batches of sql updates,
* and closes the transaction at the end of the window. Each tuple corresponds to an SQL update statement.
- * The operator groups the updates in a batch and submits them with one call to the database. Batch processing improves performance considerably.<br/>
+ * The operator groups the updates in a batch and submits them with one call to the database. Batch processing
+ * improves performance considerably.<br/>
* The size of a batch is configured by batchSize property.
* </p>
* <p>
@@ -55,7 +55,8 @@ import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator
* @param <T> type of tuple
* @since 0.9.4
*/
-public abstract class AbstractJdbcTransactionableOutputOperator<T> extends AbstractPassThruTransactionableStoreOutputOperator<T, JdbcTransactionalStore>
+public abstract class AbstractJdbcTransactionableOutputOperator<T>
+ extends AbstractPassThruTransactionableStoreOutputOperator<T, JdbcTransactionalStore>
{
protected static int DEFAULT_BATCH_SIZE = 1000;
@@ -80,8 +81,7 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> extends Abstr
super.setup(context);
try {
updateCommand = store.connection.prepareStatement(getUpdateCommand());
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException(e);
}
@@ -117,11 +117,9 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> extends Abstr
}
updateCommand.executeBatch();
updateCommand.clearBatch();
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException("processing batch", e);
- }
- finally {
+ } finally {
batchStartIdx += tuples.size() - batchStartIdx;
}
}
@@ -142,7 +140,7 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> extends Abstr
*
* @return the sql statement to update a tuple in the database.
*/
- @Nonnull
+ @NotNull
protected abstract String getUpdateCommand();
/**
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperator.java
index 93006eb..7ca2ae1 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperator.java
@@ -78,8 +78,7 @@ public abstract class JDBCLookupCacheBackedOperator<T> extends AbstractDBLookupC
try {
putStatement = store.connection.prepareStatement(insertQuery);
getStatement = store.connection.prepareStatement(getQuery);
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException(e);
}
}
@@ -90,8 +89,7 @@ public abstract class JDBCLookupCacheBackedOperator<T> extends AbstractDBLookupC
try {
preparePutStatement(putStatement, key, value);
putStatement.executeUpdate();
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException("while executing insert", e);
}
}
@@ -103,8 +101,7 @@ public abstract class JDBCLookupCacheBackedOperator<T> extends AbstractDBLookupC
prepareGetStatement(getStatement, key);
ResultSet resultSet = getStatement.executeQuery();
return processResultSet(resultSet);
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException("while fetching key", e);
}
}
@@ -118,8 +115,7 @@ public abstract class JDBCLookupCacheBackedOperator<T> extends AbstractDBLookupC
prepareGetStatement(getStatement, key);
ResultSet resultSet = getStatement.executeQuery();
values.add(processResultSet(resultSet));
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException("while fetching keys", e);
}
}
@@ -128,7 +124,8 @@ public abstract class JDBCLookupCacheBackedOperator<T> extends AbstractDBLookupC
protected abstract void prepareGetStatement(PreparedStatement getStatement, Object key) throws SQLException;
- protected abstract void preparePutStatement(PreparedStatement putStatement, Object key, Object value) throws SQLException;
+ protected abstract void preparePutStatement(PreparedStatement putStatement, Object key, Object value)
+ throws SQLException;
protected abstract String fetchInsertQuery();
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
index eafff3c..3aa6fac 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
@@ -19,7 +19,14 @@
package com.datatorrent.lib.db.jdbc;
import java.math.BigDecimal;
-import java.sql.*;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
import java.util.List;
import java.util.Map;
@@ -37,7 +44,6 @@ import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-
import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.PojoUtils;
@@ -49,8 +55,8 @@ import com.datatorrent.lib.util.PojoUtils;
*
* For eg. user can set the query property to a complex one : "select x1, x2 from t1, t2 where t1.x3 = t2.x3 ;"<br/>
*
- * This implementation is generic so it uses offset/limit mechanism for batching which is not optimal. Batching is most efficient
- * when the tables/views are indexed and the query uses this information to retrieve data.<br/>
+ * This implementation is generic so it uses offset/limit mechanism for batching which is not optimal. Batching is
+ * most efficient when the tables/views are indexed and the query uses this information to retrieve data.<br/>
* This can be achieved in sub-classes by overriding {@link #queryToRetrieveData()} and {@link #setRuntimeParams()}.
*
* @displayName Jdbc Input Operator
@@ -58,7 +64,8 @@ import com.datatorrent.lib.util.PojoUtils;
* @tags database, sql, pojo, jdbc
* @since 2.1.0
*/
-public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> implements Operator.ActivationListener<Context.OperatorContext>
+public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object>
+ implements Operator.ActivationListener<Context.OperatorContext>
{
private static int DEF_FETCH_SIZE = 100;
@@ -363,53 +370,65 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> imp
switch (type) {
case (Types.CHAR):
case (Types.VARCHAR):
- activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(),
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression(),
String.class);
break;
case (Types.BOOLEAN):
- activeFieldInfo.setterOrGetter = PojoUtils.createSetterBoolean(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterBoolean(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.TINYINT):
- activeFieldInfo.setterOrGetter = PojoUtils.createSetterByte(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterByte(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.SMALLINT):
- activeFieldInfo.setterOrGetter = PojoUtils.createSetterShort(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterShort(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.INTEGER):
- activeFieldInfo.setterOrGetter = PojoUtils.createSetterInt(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterInt(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.BIGINT):
- activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.FLOAT):
- activeFieldInfo.setterOrGetter = PojoUtils.createSetterFloat(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterFloat(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.DOUBLE):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case Types.DECIMAL:
- activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(),
- BigDecimal.class);
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression(),
+ BigDecimal.class);
break;
case Types.TIMESTAMP:
- activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case Types.TIME:
- activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case Types.DATE:
- activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
default:
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
index 5399c44..ffb4160 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
@@ -18,12 +18,32 @@
*/
package com.datatorrent.lib.db.jdbc;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.collect.Lists;
+
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-
import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.PojoUtils;
import com.datatorrent.lib.util.PojoUtils.Getter;
@@ -34,18 +54,6 @@ import com.datatorrent.lib.util.PojoUtils.GetterInt;
import com.datatorrent.lib.util.PojoUtils.GetterLong;
import com.datatorrent.lib.util.PojoUtils.GetterShort;
-import java.math.BigDecimal;
-import java.sql.*;
-import java.util.List;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.hadoop.classification.InterfaceStability.Evolving;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
/**
* <p>
* JdbcPOJOOutputOperator class.</p>
@@ -57,7 +65,8 @@ import com.google.common.collect.Lists;
* @since 2.1.0
*/
@Evolving
-public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object> implements Operator.ActivationListener<OperatorContext>
+public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object>
+ implements Operator.ActivationListener<OperatorContext>
{
@NotNull
private List<FieldInfo> fieldInfos;
@@ -165,51 +174,51 @@ public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOpe
switch (type) {
case (Types.CHAR):
case (Types.VARCHAR):
- statement.setString(i + 1, ((Getter<Object, String>) activeFieldInfo.setterOrGetter).get(tuple));
+ statement.setString(i + 1, ((Getter<Object, String>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.BOOLEAN):
- statement.setBoolean(i + 1, ((GetterBoolean<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+ statement.setBoolean(i + 1, ((GetterBoolean<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.TINYINT):
- statement.setByte(i + 1, ((PojoUtils.GetterByte<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+ statement.setByte(i + 1, ((PojoUtils.GetterByte<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.SMALLINT):
- statement.setShort(i + 1, ((GetterShort<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+ statement.setShort(i + 1, ((GetterShort<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.INTEGER):
- statement.setInt(i + 1, ((GetterInt<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+ statement.setInt(i + 1, ((GetterInt<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.BIGINT):
- statement.setLong(i + 1, ((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+ statement.setLong(i + 1, ((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.FLOAT):
- statement.setFloat(i + 1, ((GetterFloat<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+ statement.setFloat(i + 1, ((GetterFloat<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.DOUBLE):
- statement.setDouble(i + 1, ((GetterDouble<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+ statement.setDouble(i + 1, ((GetterDouble<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case Types.DECIMAL:
- statement.setBigDecimal(i + 1, ((Getter<Object, BigDecimal>) activeFieldInfo.setterOrGetter).get(tuple));
+ statement.setBigDecimal(i + 1, ((Getter<Object, BigDecimal>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case Types.TIMESTAMP:
- statement.setTimestamp(i + 1, new Timestamp(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple)));
+ statement.setTimestamp(i + 1, new Timestamp(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
break;
case Types.TIME:
- statement.setTime(i + 1, new Time(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple)));
+ statement.setTime(i + 1, new Time(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
break;
case Types.DATE:
- statement.setDate(i + 1, new Date(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple)));
+ statement.setDate(i + 1, new Date(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
break;
default:
@@ -271,53 +280,64 @@ public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOpe
switch (type) {
case (Types.CHAR):
case (Types.VARCHAR):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(),
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression(),
String.class);
break;
case (Types.BOOLEAN):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterBoolean(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterBoolean(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.TINYINT):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterByte(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterByte(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.SMALLINT):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterShort(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterShort(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.INTEGER):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterInt(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterInt(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.BIGINT):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.FLOAT):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterFloat(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterFloat(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.DOUBLE):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case Types.DECIMAL:
- activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(),
- BigDecimal.class);
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression(), BigDecimal.class);
break;
case Types.TIMESTAMP:
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case Types.TIME:
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case Types.DATE:
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
default:
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java
index 2e284ad..d9901aa 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java
@@ -32,8 +32,8 @@ import com.google.common.base.CharMatcher;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
-import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.lib.db.Connectable;
+import com.datatorrent.netlet.util.DTThrowable;
/**
* A {@link Connectable} that uses jdbc to connect to stores.
@@ -125,7 +125,8 @@ public class JdbcStore implements Connectable
*/
public void setConnectionProperties(String connectionProps)
{
- String[] properties = Iterables.toArray(Splitter.on(CharMatcher.anyOf(":,")).omitEmptyStrings().trimResults().split(connectionProps), String.class);
+ String[] properties = Iterables.toArray(Splitter.on(CharMatcher.anyOf(":,")).omitEmptyStrings().trimResults()
+ .split(connectionProps), String.class);
for (int i = 0; i < properties.length; i += 2) {
if (i + 1 < properties.length) {
connectionProperties.put(properties[i], properties[i + 1]);
@@ -163,8 +164,7 @@ public class JdbcStore implements Connectable
connection = DriverManager.getConnection(databaseUrl, connectionProperties);
logger.debug("JDBC connection Success");
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
DTThrowable.rethrow(t);
}
}
@@ -177,8 +177,7 @@ public class JdbcStore implements Connectable
{
try {
connection.close();
- }
- catch (SQLException ex) {
+ } catch (SQLException ex) {
throw new RuntimeException("closing database resource", ex);
}
}
@@ -188,8 +187,7 @@ public class JdbcStore implements Connectable
{
try {
return !connection.isClosed();
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException("is isConnected", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java
index e4a7229..9bc18b0 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java
@@ -24,10 +24,11 @@ import java.sql.SQLException;
import javax.validation.constraints.NotNull;
-import com.datatorrent.lib.db.TransactionableStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.lib.db.TransactionableStore;
+
/**
* <p>JdbcTransactionalStore class.</p>
*
@@ -117,7 +118,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
super.connect();
try {
String command = "select " + metaTableWindowColumn + " from " + metaTable + " where " + metaTableAppIdColumn +
- " = ? and " + metaTableOperatorIdColumn + " = ?";
+ " = ? and " + metaTableOperatorIdColumn + " = ?";
logger.debug(command);
lastWindowFetchCommand = connection.prepareStatement(command);
@@ -126,18 +127,18 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
logger.debug(command);
lastWindowInsertCommand = connection.prepareStatement(command);
- command = "update " + metaTable + " set " + metaTableWindowColumn + " = ? where " + metaTableAppIdColumn + " = ? " +
- " and " + metaTableOperatorIdColumn + " = ?";
+ command = "update " + metaTable + " set " + metaTableWindowColumn + " = ? where " + metaTableAppIdColumn + " = ? "
+ + " and " + metaTableOperatorIdColumn + " = ?";
logger.debug(command);
lastWindowUpdateCommand = connection.prepareStatement(command);
- command = "delete from " + metaTable + " where " + metaTableAppIdColumn + " = ? and " + metaTableOperatorIdColumn + " = ?";
+ command = "delete from " + metaTable + " where " + metaTableAppIdColumn + " = ? and " + metaTableOperatorIdColumn
+ + " = ?";
logger.debug(command);
lastWindowDeleteCommand = connection.prepareStatement(command);
connection.setAutoCommit(false);
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException(e);
}
}
@@ -148,8 +149,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
if (lastWindowUpdateCommand != null) {
try {
lastWindowUpdateCommand.close();
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException(e);
}
}
@@ -168,8 +168,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
try {
connection.commit();
inTransaction = false;
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException(e);
}
}
@@ -180,8 +179,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
try {
connection.rollback();
inTransaction = false;
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException(e);
}
}
@@ -231,16 +229,14 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
ResultSet resultSet = lastWindowFetchCommand.executeQuery();
if (resultSet.next()) {
lastWindow = resultSet.getLong(1);
- }
- else {
+ } else {
lastWindowInsertCommand.setString(1, appId);
lastWindowInsertCommand.setInt(2, operatorId);
lastWindowInsertCommand.setLong(3, -1);
lastWindowInsertCommand.executeUpdate();
}
return lastWindow;
- }
- catch (SQLException ex) {
+ } catch (SQLException ex) {
throw new RuntimeException(ex);
}
}
@@ -253,8 +249,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
lastWindowUpdateCommand.setString(2, appId);
lastWindowUpdateCommand.setInt(3, operatorId);
lastWindowUpdateCommand.executeUpdate();
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException(e);
}
}
@@ -266,8 +261,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
lastWindowDeleteCommand.setString(1, appId);
lastWindowDeleteCommand.setInt(2, operatorId);
lastWindowDeleteCommand.executeUpdate();
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java
index 1cf57c5..d34f99b 100644
--- a/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java
@@ -27,15 +27,15 @@ import java.util.Map;
import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.commons.net.ftp.FTP;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.ftp.FTPFileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.datatorrent.api.DefaultOutputPort;
-
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
/**
@@ -90,10 +90,10 @@ public abstract class AbstractFTPInputOperator<T> extends AbstractFileInputOpera
{
super.partitioned(partitions);
for (Partition<AbstractFileInputOperator<T>> partition : partitions.values()) {
- ((AbstractFTPInputOperator<T>) partition.getPartitionedInstance()).host = host;
- ((AbstractFTPInputOperator<T>) partition.getPartitionedInstance()).port = port;
- ((AbstractFTPInputOperator<T>) partition.getPartitionedInstance()).userName = userName;
- ((AbstractFTPInputOperator<T>) partition.getPartitionedInstance()).password = password;
+ ((AbstractFTPInputOperator<T>)partition.getPartitionedInstance()).host = host;
+ ((AbstractFTPInputOperator<T>)partition.getPartitionedInstance()).port = port;
+ ((AbstractFTPInputOperator<T>)partition.getPartitionedInstance()).userName = userName;
+ ((AbstractFTPInputOperator<T>)partition.getPartitionedInstance()).password = password;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index 2aa658f..15a208f 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -217,7 +217,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
/**
* File output counters.
*/
- protected final BasicCounters<MutableLong> fileCounters = new BasicCounters<MutableLong>(MutableLong.class);
+ protected final BasicCounters<MutableLong> fileCounters = new BasicCounters<>(MutableLong.class);
protected StreamCodec<INPUT> streamCodec;
@@ -252,8 +252,9 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
* accessed by a read or write.
* <p/>
* https://code.google.com/p/guava-libraries/wiki/CachesExplained <br/>
- * Caches built with CacheBuilder do not perform cleanup and evict values "automatically," or instantly after a value expires, or anything of the sort.
- * Instead, it performs small amounts of maintenance during write operations, or during occasional read operations if writes are rare.<br/>
+ * Caches built with CacheBuilder do not perform cleanup and evict values "automatically," or instantly after a
+ * value expires, or anything of the sort. Instead, it performs small amounts of maintenance during write
+ * operations, or during occasional read operations if writes are rare.<br/>
* This isn't the most effective way but adds a little bit of optimization.
*/
private Long expireStreamAfterAcessMillis;
@@ -275,8 +276,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
{
if (AbstractFileOutputOperator.this.streamCodec == null) {
return super.getStreamCodec();
- }
- else {
+ } else {
return streamCodec;
}
}
@@ -414,8 +414,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
LOG.debug("rotating file at setup.");
rotate(seenFileName);
}
- }
- catch (IOException | ExecutionException e) {
+ } catch (IOException | ExecutionException e) {
throw new RuntimeException(e);
}
}
@@ -441,8 +440,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
* @param filename name of the actual file.
* @param partFileName name of the part file. When not rolling this is same as filename; otherwise this is the
* latest open part file name.
- * @param filepath path of the file. When always writing to temp file, this is the path of the temp file; otherwise
- * path of the actual file.
+ * @param filepath path of the file. When always writing to temp file, this is the path of the temp file;
+ * otherwise path of the actual file.
* @throws IOException
*/
private void recoverFile(String filename, String partFileName, Path filepath) throws IOException
@@ -714,7 +713,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
//Close all the streams you can
Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
- for(String seenFileName: openStreams.keySet()) {
+ for (String seenFileName : openStreams.keySet()) {
FSFilterStreamContext fsFilterStreamContext = openStreams.get(seenFileName);
try {
long start = System.currentTimeMillis();
@@ -1238,8 +1237,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
LOG.debug("rename from tmp {} actual {} ", tmpFileName, fileName);
rename(srcPath, destPath);
} else if (fs.exists(srcPath)) {
- //if the destination and src both exists that means there was a failure between file rename and clearing the endOffset so
- //we just delete the tmp file.
+ /*if the destination and src both exists that means there was a failure between file rename and clearing the
+ endOffset so we just delete the tmp file*/
LOG.debug("deleting tmp {}", tmpFileName);
fs.delete(srcPath, true);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
index fe9e35f..43445a7 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
@@ -25,14 +25,22 @@ import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
-import javax.jms.*;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
-import org.apache.commons.lang.mutable.MutableLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.lang.mutable.MutableLong;
+
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -43,10 +51,9 @@ import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.ActivationListener;
import com.datatorrent.api.annotation.OperatorAnnotation;
-
-import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.netlet.util.DTThrowable;
/**
* This is the base implementation of a JMS input operator.<br/>
@@ -71,8 +78,9 @@ import com.datatorrent.lib.io.IdempotentStorageManager;
* @since 0.3.2
*/
@OperatorAnnotation(checkpointableWithinAppWindow = false)
-public abstract class AbstractJMSInputOperator<T> extends JMSBase implements InputOperator, ActivationListener<OperatorContext>,
- MessageListener, ExceptionListener, Operator.IdleTimeHandler, Operator.CheckpointListener
+public abstract class AbstractJMSInputOperator<T> extends JMSBase
+ implements InputOperator, ActivationListener<OperatorContext>, MessageListener, ExceptionListener,
+ Operator.IdleTimeHandler, Operator.CheckpointListener
{
protected static final int DEFAULT_BUFFER_SIZE = 10 * 1024; // 10k
@@ -102,8 +110,8 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
protected transient long currentWindowId;
protected transient int emitCount;
- private transient final Set<String> pendingAck;
- private transient final Lock lock;
+ private final transient Set<String> pendingAck;
+ private final transient Lock lock;
public final transient DefaultOutputPort<T> output = new DefaultOutputPort<T>();
@@ -129,8 +137,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
synchronized (lock) {
try {
return messageConsumed(message) && super.add(message);
- }
- catch (JMSException e) {
+ } catch (JMSException e) {
LOG.error("message consumption", e);
throwable.set(e);
throw new RuntimeException(e);
@@ -162,10 +169,10 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
{
try {
if (message.getJMSReplyTo() != null) { // Send reply only if the replyTo destination is set
- replyProducer.send(message.getJMSReplyTo(), getSession().createTextMessage("Reply: " + message.getJMSMessageID()));
+ replyProducer.send(message.getJMSReplyTo(),
+ getSession().createTextMessage("Reply: " + message.getJMSMessageID()));
}
- }
- catch (JMSException ex) {
+ } catch (JMSException ex) {
LOG.error(ex.getLocalizedMessage());
throwable.set(ex);
throw new RuntimeException(ex);
@@ -199,8 +206,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
if (operatorRecoveredWindows != null) {
Arrays.sort(operatorRecoveredWindows);
}
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException("fetching windows", e);
}
}
@@ -223,7 +229,8 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
pendingAck.add(message.getJMSMessageID());
MutableLong receivedCt = counters.getCounter(CounterKeys.RECEIVED);
receivedCt.increment();
- LOG.debug("message id: {} buffer size: {} received: {}", message.getJMSMessageID(), holdingBuffer.size(), receivedCt.longValue());
+ LOG.debug("message id: {} buffer size: {} received: {}", message.getJMSMessageID(), holdingBuffer.size(),
+ receivedCt.longValue());
return true;
}
@@ -239,11 +246,10 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
replyProducer = getSession().createProducer(null);
consumer = (isDurable() && isTopic()) ?
- getSession().createDurableSubscriber((Topic) getDestination(), consumerName) :
- getSession().createConsumer(getDestination());
+ getSession().createDurableSubscriber((Topic)getDestination(), consumerName) :
+ getSession().createConsumer(getDestination());
consumer.setMessageListener(this);
- }
- catch (JMSException ex) {
+ } catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
@@ -264,7 +270,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
{
try {
@SuppressWarnings("unchecked")
- Map<String, T> recoveredData = (Map<String, T>) idempotentStorageManager.load(context.getId(), windowId);
+ Map<String, T> recoveredData = (Map<String, T>)idempotentStorageManager.load(context.getId(), windowId);
if (recoveredData == null) {
return;
}
@@ -272,8 +278,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
pendingAck.add(recoveredEntry.getKey());
emit(recoveredEntry.getValue());
}
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException("replay", e);
}
}
@@ -306,8 +311,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
currentWindowRecoveryState.put(message.getJMSMessageID(), payload);
emit(payload);
}
- }
- catch (JMSException e) {
+ } catch (JMSException e) {
throw new RuntimeException("processing msg", e);
}
}
@@ -320,12 +324,10 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
/* nothing to do here, so sleep for a while to avoid busy loop */
try {
Thread.sleep(spinMillis);
- }
- catch (InterruptedException ie) {
+ } catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
- }
- else {
+ } else {
DTThrowable.rethrow(lthrowable);
}
}
@@ -338,7 +340,8 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
* acknowledged have been persisted because they wouldn't be redelivered. Also if they are persisted then
* they shouldn't be re-delivered because that would cause duplicates.<br/>
*
- * This is why when recovery data is persisted and messages are acknowledged, the thread that consumes message is blocked.<br/>
+ * This is why when recovery data is persisted and messages are acknowledged, the thread that consumes message is
+ * blocked.<br/>
*/
@Override
public void endWindow()
@@ -365,27 +368,24 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
}
ackCompleted = true;
pendingAck.clear();
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
if (!ackCompleted) {
LOG.info("confirm recovery of {} for {} does not exist", context.getId(), currentWindowId, t);
}
DTThrowable.rethrow(t);
- }
- finally {
+ } finally {
if (stateSaved && !ackCompleted) {
try {
idempotentStorageManager.delete(context.getId(), currentWindowId);
- }
- catch (IOException e) {
+ } catch (IOException e) {
LOG.error("unable to delete corrupted state", e);
}
}
}
}
emitCount = 0; //reset emit count
- }
- else if (operatorRecoveredWindows != null && currentWindowId < operatorRecoveredWindows[operatorRecoveredWindows.length - 1]) {
+ } else if (operatorRecoveredWindows != null &&
+ currentWindowId < operatorRecoveredWindows[operatorRecoveredWindows.length - 1]) {
//pendingAck is not cleared for the last replayed window of this operator. This is because there is
//still a chance that in the previous run the operator crashed after saving the state but before acknowledgement.
pendingAck.clear();
@@ -402,8 +402,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
{
if (isTransacted()) {
getSession().commit();
- }
- else if (getSessionAckMode(getAckMode()) == Session.CLIENT_ACKNOWLEDGE) {
+ } else if (getSessionAckMode(getAckMode()) == Session.CLIENT_ACKNOWLEDGE) {
lastMsg.acknowledge(); // acknowledge all consumed messages till now
}
}
@@ -418,8 +417,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
{
try {
idempotentStorageManager.deleteUpTo(context.getId(), windowId);
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException("committing", e);
}
}
@@ -441,8 +439,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
consumer = null;
super.cleanup();
- }
- catch (JMSException ex) {
+ } catch (JMSException ex) {
throw new RuntimeException("at cleanup", ex);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/io/jms/JMSStringInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/JMSStringInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/JMSStringInputOperator.java
index bbc8d0f..a72d127 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/JMSStringInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/JMSStringInputOperator.java
@@ -37,12 +37,10 @@ public class JMSStringInputOperator extends AbstractJMSInputOperator<String>
public String convert(Message message) throws JMSException
{
if (message instanceof TextMessage) {
- return ((TextMessage) message).getText();
- }
- else if (message instanceof StreamMessage) {
- return ((StreamMessage) message).readString();
- }
- else {
+ return ((TextMessage)message).getText();
+ } else if (message instanceof StreamMessage) {
+ return ((StreamMessage)message).readString();
+ } else {
throw new IllegalArgumentException("Unhandled message type " + message.getClass().getName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/AvgAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/AvgAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/AvgAggregator.java
index 33f7f8d..ab9a261 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/AvgAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/AvgAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.Collection;
import com.datatorrent.api.annotation.Name;
-
import com.datatorrent.common.metric.SingleMetricAggregator;
/**
@@ -38,7 +37,7 @@ public class AvgAggregator implements SingleMetricAggregator, Serializable
double sum = 0;
for (Object value : metricValues) {
- sum += ((Number) value).doubleValue();
+ sum += ((Number)value).doubleValue();
}
return sum / metricValues.size();
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/max/DoubleMaxAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/max/DoubleMaxAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/max/DoubleMaxAggregator.java
index 40d8c04..13988f9 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/max/DoubleMaxAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/max/DoubleMaxAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.Collection;
import com.datatorrent.api.annotation.Name;
-
import com.datatorrent.common.metric.SingleMetricAggregator;
/**
@@ -37,7 +36,7 @@ public class DoubleMaxAggregator implements SingleMetricAggregator, Serializable
{
Double max = null;
for (Object value : metricValues) {
- double dval = ((Number) value).doubleValue();
+ double dval = ((Number)value).doubleValue();
if (max == null || dval > max) {
max = dval;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/max/FloatMaxAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/max/FloatMaxAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/max/FloatMaxAggregator.java
index 68f8f2a..6b228e8 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/max/FloatMaxAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/max/FloatMaxAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.Collection;
import com.datatorrent.api.annotation.Name;
-
import com.datatorrent.common.metric.SingleMetricAggregator;
/**
@@ -37,7 +36,7 @@ public class FloatMaxAggregator implements SingleMetricAggregator, Serializable
{
Float max = null;
for (Object value : metricValues) {
- float fval = ((Number) value).floatValue();
+ float fval = ((Number)value).floatValue();
if (max == null || fval > max) {
max = fval;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/max/IntMaxAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/max/IntMaxAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/max/IntMaxAggregator.java
index 71eaba0..2cd0264 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/max/IntMaxAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/max/IntMaxAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.Collection;
import com.datatorrent.api.annotation.Name;
-
import com.datatorrent.common.metric.SingleMetricAggregator;
/**
@@ -37,7 +36,7 @@ public class IntMaxAggregator implements SingleMetricAggregator, Serializable
{
Integer max = null;
for (Object value : metricValues) {
- int ival = ((Number) value).intValue();
+ int ival = ((Number)value).intValue();
if (max == null || ival > max) {
max = ival;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/max/LongMaxAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/max/LongMaxAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/max/LongMaxAggregator.java
index 43a1abd..f0aab73 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/max/LongMaxAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/max/LongMaxAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.Collection;
import com.datatorrent.api.annotation.Name;
-
import com.datatorrent.common.metric.SingleMetricAggregator;
/**
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/min/DoubleMinAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/min/DoubleMinAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/min/DoubleMinAggregator.java
index d65e744..1b1b712 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/min/DoubleMinAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/min/DoubleMinAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.Collection;
import com.datatorrent.api.annotation.Name;
-
import com.datatorrent.common.metric.SingleMetricAggregator;
/**
@@ -37,7 +36,7 @@ public class DoubleMinAggregator implements SingleMetricAggregator, Serializable
{
Double min = null;
for (Object value : metricValues) {
- double dval = ((Number) value).doubleValue();
+ double dval = ((Number)value).doubleValue();
if (min == null || dval < min) {
min = dval;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/min/FloatMinAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/min/FloatMinAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/min/FloatMinAggregator.java
index bbe0861..6713911 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/min/FloatMinAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/min/FloatMinAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.Collection;
import com.datatorrent.api.annotation.Name;
-
import com.datatorrent.common.metric.SingleMetricAggregator;
/**
@@ -37,7 +36,7 @@ public class FloatMinAggregator implements SingleMetricAggregator, Serializable
{
Float min = null;
for (Object metric : metricValues) {
- float fval = ((Number) metric).floatValue();
+ float fval = ((Number)metric).floatValue();
if (min == null || fval < min) {
min = fval;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/min/IntMinAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/min/IntMinAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/min/IntMinAggregator.java
index 76ffebc..72c610f 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/min/IntMinAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/min/IntMinAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.Collection;
import com.datatorrent.api.annotation.Name;
-
import com.datatorrent.common.metric.SingleMetricAggregator;
/**
@@ -37,7 +36,7 @@ public class IntMinAggregator implements SingleMetricAggregator, Serializable
{
Integer min = null;
for (Object value : metricValues) {
- int ival = ((Number) value).intValue();
+ int ival = ((Number)value).intValue();
if (min == null || ival < min) {
min = ival;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/min/LongMinAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/min/LongMinAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/min/LongMinAggregator.java
index 19a1dab..0deb808 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/min/LongMinAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/min/LongMinAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.Collection;
import com.datatorrent.api.annotation.Name;
-
import com.datatorrent.common.metric.SingleMetricAggregator;
/**
@@ -37,7 +36,7 @@ public class LongMinAggregator implements SingleMetricAggregator, Serializable
{
Long min = null;
for (Object value : metricValues) {
- long lval = ((Number) value).longValue();
+ long lval = ((Number)value).longValue();
if (min == null || lval < min) {
min = lval;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/test/java/com/datatorrent/lib/codec/KryoStreamCodecTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/codec/KryoStreamCodecTest.java b/library/src/test/java/com/datatorrent/lib/codec/KryoStreamCodecTest.java
index 0d5c646..4affef7 100644
--- a/library/src/test/java/com/datatorrent/lib/codec/KryoStreamCodecTest.java
+++ b/library/src/test/java/com/datatorrent/lib/codec/KryoStreamCodecTest.java
@@ -31,79 +31,89 @@ import com.datatorrent.netlet.util.Slice;
* Tests for {@link KryoSerializableStreamCodec}
*
*/
-public class KryoStreamCodecTest {
+public class KryoStreamCodecTest
+{
- public static class TestTuple {
- final Integer field;
+ public static class TestTuple
+ {
+ final Integer field;
- @SuppressWarnings("unused")
- private TestTuple(){
- this.field= 0;
- }
-
- public TestTuple(Integer x){
- this.field= Preconditions.checkNotNull(x,"x");
- }
+ @SuppressWarnings("unused")
+ private TestTuple()
+ {
+ this.field = 0;
+ }
- @Override
- public boolean equals(Object o) {
- return o.getClass()== this.getClass() && ((TestTuple)o).field.equals(this.field);
- }
+ public TestTuple(Integer x)
+ {
+ this.field = Preconditions.checkNotNull(x, "x");
+ }
- @Override
- public int hashCode() {
- return TestTuple.class.hashCode()^ this.field.hashCode();
- }
+ @Override
+ public boolean equals(Object o)
+ {
+ return o.getClass() == this.getClass() && ((TestTuple)o).field.equals(this.field);
}
- public static class TestKryoStreamCodec extends KryoSerializableStreamCodec<TestTuple> {
+ @Override
+ public int hashCode()
+ {
+ return TestTuple.class.hashCode() ^ this.field.hashCode();
+ }
+ }
- public TestKryoStreamCodec(){
- super();
- }
+ public static class TestKryoStreamCodec extends KryoSerializableStreamCodec<TestTuple>
+ {
- @Override
- public int getPartition(TestTuple testTuple) {
- return testTuple.field;
- }
+ public TestKryoStreamCodec()
+ {
+ super();
}
- @Test
- public void testSomeMethod() throws IOException
+ @Override
+ public int getPartition(TestTuple testTuple)
{
- TestKryoStreamCodec coder = new TestKryoStreamCodec();
- TestKryoStreamCodec decoder = new TestKryoStreamCodec();
+ return testTuple.field;
+ }
+ }
- KryoSerializableStreamCodec<Object> objCoder = new KryoSerializableStreamCodec<Object>();
- Slice sliceOfObj = objCoder.toByteArray(10);
- Integer decodedObj = (Integer) objCoder.fromByteArray(sliceOfObj);
+ @Test
+ public void testSomeMethod() throws IOException
+ {
+ TestKryoStreamCodec coder = new TestKryoStreamCodec();
+ TestKryoStreamCodec decoder = new TestKryoStreamCodec();
- Assert.assertEquals("codec", decodedObj.intValue(), 10);
+ KryoSerializableStreamCodec<Object> objCoder = new KryoSerializableStreamCodec<Object>();
+ Slice sliceOfObj = objCoder.toByteArray(10);
+ Integer decodedObj = (Integer)objCoder.fromByteArray(sliceOfObj);
- TestTuple tp= new TestTuple(5);
+ Assert.assertEquals("codec", decodedObj.intValue(), 10);
- Slice dsp1 = coder.toByteArray(tp);
- Slice dsp2 = coder.toByteArray(tp);
- Assert.assertEquals(dsp1, dsp2);
+ TestTuple tp = new TestTuple(5);
- Object tcObject1 = decoder.fromByteArray(dsp1);
- assert (tp.equals(tcObject1));
+ Slice dsp1 = coder.toByteArray(tp);
+ Slice dsp2 = coder.toByteArray(tp);
+ Assert.assertEquals(dsp1, dsp2);
- Object tcObject2 = decoder.fromByteArray(dsp2);
- assert (tp.equals(tcObject2));
+ Object tcObject1 = decoder.fromByteArray(dsp1);
+ assert (tp.equals(tcObject1));
- dsp1 = coder.toByteArray(tp);
- dsp2 = coder.toByteArray(tp);
- Assert.assertEquals(dsp1, dsp2);
- }
+ Object tcObject2 = decoder.fromByteArray(dsp2);
+ assert (tp.equals(tcObject2));
- @Test
- public void testFinalFieldSerialization() throws Exception{
- TestTuple t1 = new TestTuple(5);
- TestKryoStreamCodec codec= new TestKryoStreamCodec();
+ dsp1 = coder.toByteArray(tp);
+ dsp2 = coder.toByteArray(tp);
+ Assert.assertEquals(dsp1, dsp2);
+ }
- Slice dsp = codec.toByteArray(t1);
- TestTuple t2 = (TestTuple)codec.fromByteArray(dsp);
- Assert.assertEquals("", t1.field, t2.field);
- }
+ @Test
+ public void testFinalFieldSerialization() throws Exception
+ {
+ TestTuple t1 = new TestTuple(5);
+ TestKryoStreamCodec codec = new TestKryoStreamCodec();
+
+ Slice dsp = codec.toByteArray(t1);
+ TestTuple t2 = (TestTuple)codec.fromByteArray(dsp);
+ Assert.assertEquals("", t1.field, t2.field);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java b/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java
index 9ee536e..caa02bf 100644
--- a/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java
@@ -62,7 +62,7 @@ public class CacheManagerTest
@Override
public boolean apply(@Nullable Object key)
{
- return key != null && (Integer) key <= 5;
+ return key != null && (Integer)key <= 5;
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/test/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperatorTest.java
index 9078271..46f49e2 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperatorTest.java
@@ -18,7 +18,12 @@
*/
package com.datatorrent.lib.db.jdbc;
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.List;
@@ -36,7 +41,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
import com.datatorrent.api.Context;
-
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
@@ -49,8 +53,9 @@ public class JDBCLookupCacheBackedOperatorTest
private static final String INMEM_DB_DRIVER = org.hsqldb.jdbcDriver.class.getName();
protected static final String TABLE_NAME = "Test_Lookup_Cache";
- protected static TestJDBCLookupCacheBackedOperator lookupCacheBackedOperator = new TestJDBCLookupCacheBackedOperator();
- protected static CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
+ protected static TestJDBCLookupCacheBackedOperator lookupCacheBackedOperator =
+ new TestJDBCLookupCacheBackedOperator();
+ protected static CollectorTestSink<Object> sink = new CollectorTestSink<>();
protected static final Map<Integer, String> mapping = Maps.newHashMap();
static {
@@ -61,9 +66,10 @@ public class JDBCLookupCacheBackedOperatorTest
mapping.put(5, "five");
}
- protected static transient final Logger logger = LoggerFactory.getLogger(JDBCLookupCacheBackedOperatorTest.class);
+ protected static final transient Logger logger = LoggerFactory.getLogger(
+ JDBCLookupCacheBackedOperatorTest.class);
- private final static Exchanger<List<Object>> bulkValuesExchanger = new Exchanger<List<Object>>();
+ private static final Exchanger<List<Object>> bulkValuesExchanger = new Exchanger<>();
public static class TestJDBCLookupCacheBackedOperator extends JDBCLookupCacheBackedOperator<String>
{
@@ -83,15 +89,15 @@ public class JDBCLookupCacheBackedOperatorTest
@Override
protected void preparePutStatement(PreparedStatement putStatement, Object key, Object value) throws SQLException
{
- putStatement.setInt(1, (Integer) key);
- putStatement.setString(2, (String) value);
+ putStatement.setInt(1, (Integer)key);
+ putStatement.setString(2, (String)value);
}
@Override
protected void prepareGetStatement(PreparedStatement getStatement, Object key) throws SQLException
{
- getStatement.setInt(1, (Integer) key);
+ getStatement.setInt(1, (Integer)key);
}
@Override
@@ -121,8 +127,7 @@ public class JDBCLookupCacheBackedOperatorTest
List<Object> values = super.getAll(keys);
try {
bulkValuesExchanger.exchange(values);
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
throw new RuntimeException(e);
}
return values;
@@ -164,7 +169,7 @@ public class JDBCLookupCacheBackedOperatorTest
Statement stmt = con.createStatement();
String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
- + " (col1 INTEGER, col2 VARCHAR(20))";
+ + " (col1 INTEGER, col2 VARCHAR(20))";
stmt.executeUpdate(createTable);
stmt.executeUpdate("Delete from " + TABLE_NAME);
@@ -172,8 +177,8 @@ public class JDBCLookupCacheBackedOperatorTest
// populate the database
for (Map.Entry<Integer, String> entry : mapping.entrySet()) {
String insert = "INSERT INTO " + TABLE_NAME
- + " (col1, col2) VALUES (" + entry.getKey() + ", '"
- + entry.getValue() + "')";
+ + " (col1, col2) VALUES (" + entry.getKey() + ", '"
+ + entry.getValue() + "')";
stmt.executeUpdate(insert);
}