You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by br...@apache.org on 2015/11/19 21:35:04 UTC
[1/4] incubator-apex-malhar git commit: MLHR-1912 #comment fixed
style violations
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 5b9eff836 -> 05b2b8987
MLHR-1912 #comment fixed style violations
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/2fe4bec5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/2fe4bec5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/2fe4bec5
Branch: refs/heads/devel-3
Commit: 2fe4bec5343032863c204a3d90c79ec176684018
Parents: 5b9eff8
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Thu Nov 19 01:14:23 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Thu Nov 19 11:11:42 2015 -0800
----------------------------------------------------------------------
.../lib/db/jdbc/JdbcTransactionalStore.java | 12 +++---
.../lib/io/fs/AbstractFileOutputOperator.java | 45 +++++++++-----------
2 files changed, 24 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2fe4bec5/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java
index 1d0f720..e4a7229 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
*/
public class JdbcTransactionalStore extends JdbcStore implements TransactionableStore
{
- private static transient final Logger LOG = LoggerFactory.getLogger(JdbcTransactionalStore.class);
+ private static final transient Logger LOG = LoggerFactory.getLogger(JdbcTransactionalStore.class);
public static String DEFAULT_APP_ID_COL = "dt_app_id";
public static String DEFAULT_OPERATOR_ID_COL = "dt_operator_id";
@@ -198,7 +198,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
Long lastWindow = getCommittedWindowIdHelper(appId, operatorId);
try {
- if(lastWindow == null) {
+ if (lastWindow == null) {
lastWindowInsertCommand.close();
connection.commit();
}
@@ -206,14 +206,12 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
lastWindowFetchCommand.close();
LOG.debug("Last window id: {}", lastWindow);
- if(lastWindow == null) {
+ if (lastWindow == null) {
return -1L;
- }
- else {
+ } else {
return lastWindow;
}
- }
- catch (SQLException ex) {
+ } catch (SQLException ex) {
throw new RuntimeException(ex);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2fe4bec5/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index fcbe1e8..2aa658f 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -126,7 +126,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
/**
* The default number of max open files.
*/
- public final static int DEFAULT_MAX_OPEN_FILES = 100;
+ public static final int DEFAULT_MAX_OPEN_FILES = 100;
/**
* Keyname to rolling file number.
@@ -310,9 +310,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
{
FileSystem tempFS = FileSystem.newInstance(new Path(filePath).toUri(), new Configuration());
- if(tempFS instanceof LocalFileSystem)
- {
- tempFS = ((LocalFileSystem) tempFS).getRaw();
+ if (tempFS instanceof LocalFileSystem) {
+ tempFS = ((LocalFileSystem)tempFS).getRaw();
}
return tempFS;
@@ -331,8 +330,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
//Getting required file system instance.
try {
fs = getFSInstance();
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
@@ -375,7 +373,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
if (rollingFile) {
//delete the left over future rolling files produced from the previous crashed instance of this operator.
- for(String seenFileName: endOffsets.keySet()) {
+ for (String seenFileName : endOffsets.keySet()) {
try {
Integer fileOpenPart = this.openPart.get(seenFileName).getValue();
int nextPart = fileOpenPart + 1;
@@ -723,12 +721,11 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
closeStream(fsFilterStreamContext);
filesWithOpenStreams.remove(seenFileName);
totalWritingTime += System.currentTimeMillis() - start;
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
//Count number of failures
numberOfFailures++;
//Add names of first N failed files to list
- if(fileNames.size() < MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION) {
+ if (fileNames.size() < MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION) {
fileNames.add(seenFileName);
//save exception
savedException = ex;
@@ -741,8 +738,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
try {
fs.close();
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
//Closing file system failed
savedException = ex;
fsFailed = true;
@@ -753,20 +749,20 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
String errorMessage = "";
//File system failed to close
- if(fsFailed) {
+ if (fsFailed) {
errorMessage += "Closing the fileSystem failed. ";
}
//Print names of atmost first N files that failed to close
- if(!fileNames.isEmpty()) {
+ if (!fileNames.isEmpty()) {
errorMessage += "The following files failed closing: ";
}
- for(String seenFileName: fileNames) {
+ for (String seenFileName: fileNames) {
errorMessage += seenFileName + ", ";
}
- if(numberOfFailures > MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION) {
+ if (numberOfFailures > MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION) {
errorMessage += (numberOfFailures - MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION) +
" more files failed.";
}
@@ -801,7 +797,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
totalBytesWritten += tupleBytes.length;
MutableLong currentOffset = endOffsets.get(fileName);
- if(currentOffset == null) {
+ if (currentOffset == null) {
currentOffset = new MutableLong(0);
endOffsets.put(fileName, currentOffset);
}
@@ -881,11 +877,10 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
*/
protected void flush(FSDataOutputStream fsOutput) throws IOException
{
- if(fs instanceof LocalFileSystem ||
- fs instanceof RawLocalFileSystem) {
+ if (fs instanceof LocalFileSystem ||
+ fs instanceof RawLocalFileSystem) {
fsOutput.flush();
- }
- else {
+ } else {
fsOutput.hflush();
}
}
@@ -920,8 +915,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
* @param part The part number of the rolling file.
* @return The rolling file name.
*/
- protected String getPartFileName(String fileName,
- int part)
+ protected String getPartFileName(String fileName, int part)
{
return fileName + "." + part;
}
@@ -940,7 +934,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
currentWindow = windowId;
}
- @Override
+ @Override
public void endWindow()
{
try {
@@ -951,8 +945,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
totalWritingTime += System.currentTimeMillis() - start;
//streamContext.resetFilter();
}
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException(e);
}
[4/4] incubator-apex-malhar git commit: Merge branch 'MLHR-1912' into
devel-3
Posted by br...@apache.org.
Merge branch 'MLHR-1912' into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/05b2b898
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/05b2b898
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/05b2b898
Branch: refs/heads/devel-3
Commit: 05b2b89874a1496be9fe476eef58febff0069292
Parents: 5b9eff8 b9aa203
Author: bright <br...@bright-mac.local>
Authored: Thu Nov 19 12:32:48 2015 -0800
Committer: bright <br...@bright-mac.local>
Committed: Thu Nov 19 12:32:48 2015 -0800
----------------------------------------------------------------------
.../datatorrent/lib/algo/UniqueValueCount.java | 13 +-
.../lib/codec/KryoSerializableStreamCodec.java | 4 +-
.../AbstractDBLookupCacheBackedOperator.java | 16 ++-
.../datatorrent/lib/db/cache/CacheManager.java | 27 +++--
.../datatorrent/lib/db/cache/CacheStore.java | 5 +-
.../lib/db/jdbc/AbstractJdbcInputOperator.java | 15 +--
...stractJdbcTransactionableOutputOperator.java | 20 ++--
.../db/jdbc/JDBCLookupCacheBackedOperator.java | 15 +--
.../lib/db/jdbc/JdbcPOJOInputOperator.java | 55 ++++++---
.../lib/db/jdbc/JdbcPOJOOutputOperator.java | 98 +++++++++------
.../com/datatorrent/lib/db/jdbc/JdbcStore.java | 14 +--
.../lib/db/jdbc/JdbcTransactionalStore.java | 48 ++++----
.../lib/io/AbstractFTPInputOperator.java | 14 +--
.../lib/io/fs/AbstractFileOutputOperator.java | 68 +++++------
.../lib/io/jms/AbstractJMSInputOperator.java | 85 +++++++------
.../lib/io/jms/JMSStringInputOperator.java | 10 +-
.../datatorrent/lib/metric/AvgAggregator.java | 3 +-
.../lib/metric/max/DoubleMaxAggregator.java | 3 +-
.../lib/metric/max/FloatMaxAggregator.java | 3 +-
.../lib/metric/max/IntMaxAggregator.java | 3 +-
.../lib/metric/max/LongMaxAggregator.java | 1 -
.../lib/metric/min/DoubleMinAggregator.java | 3 +-
.../lib/metric/min/FloatMinAggregator.java | 3 +-
.../lib/metric/min/IntMinAggregator.java | 3 +-
.../lib/metric/min/LongMinAggregator.java | 3 +-
.../lib/codec/KryoStreamCodecTest.java | 120 ++++++++++---------
.../lib/db/cache/CacheManagerTest.java | 2 +-
.../jdbc/JDBCLookupCacheBackedOperatorTest.java | 33 ++---
.../lib/db/jdbc/JdbcOperatorTest.java | 43 ++++---
.../datatorrent/lib/db/jdbc/JdbcStoreTest.java | 7 +-
.../lib/io/FTPStringInputOperatorTest.java | 7 +-
.../io/fs/AbstractFileOutputOperatorTest.java | 4 +-
.../lib/io/jms/JMSStringInputOperatorTest.java | 39 +++---
pom.xml | 2 +-
34 files changed, 412 insertions(+), 377 deletions(-)
----------------------------------------------------------------------
[2/4] incubator-apex-malhar git commit: MLHR-1912 #resolve #comment
fixed style violations
Posted by br...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
index efa9a4c..1202511 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
@@ -18,7 +18,12 @@
*/
package com.datatorrent.lib.db.jdbc;
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.List;
import javax.annotation.Nonnull;
@@ -32,7 +37,6 @@ import com.google.common.collect.Lists;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
-
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.helper.TestPortContext;
import com.datatorrent.lib.testbench.CollectorTestSink;
@@ -109,16 +113,18 @@ public class JdbcOperatorTest
Statement stmt = con.createStatement();
String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
- + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
- + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
- + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
- + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
- + ")";
+ + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+ + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+ + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+ + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
+ + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+ + ")";
stmt.executeUpdate(createMetaTable);
String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (ID INTEGER)";
stmt.executeUpdate(createTable);
- String createPOJOTable = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME + "(id INTEGER not NULL,name VARCHAR(255), PRIMARY KEY ( id ))";
+ String createPOJOTable = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME
+ + "(id INTEGER not NULL,name VARCHAR(255), PRIMARY KEY ( id ))";
stmt.executeUpdate(createPOJOTable);
} catch (Throwable e) {
DTThrowable.rethrow(e);
@@ -255,9 +261,11 @@ public class JdbcOperatorTest
transactionalStore.setDatabaseDriver(DB_DRIVER);
transactionalStore.setDatabaseUrl(URL);
- com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+ com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap =
+ new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
attributeMap.put(DAG.APPLICATION_ID, APP_ID);
- OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
+ OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
+ OPERATOR_ID, attributeMap);
TestOutputOperator outputOperator = new TestOutputOperator();
outputOperator.setBatchSize(3);
@@ -287,9 +295,11 @@ public class JdbcOperatorTest
transactionalStore.setDatabaseDriver(DB_DRIVER);
transactionalStore.setDatabaseUrl(URL);
- com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+ com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap =
+ new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
attributeMap.put(DAG.APPLICATION_ID, APP_ID);
- OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
+ OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
+ OPERATOR_ID, attributeMap);
TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
outputOperator.setBatchSize(3);
@@ -332,9 +342,11 @@ public class JdbcOperatorTest
store.setDatabaseDriver(DB_DRIVER);
store.setDatabaseUrl(URL);
- com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+ com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap =
+ new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
attributeMap.put(DAG.APPLICATION_ID, APP_ID);
- OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
+ OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
+ OPERATOR_ID, attributeMap);
TestInputOperator inputOperator = new TestInputOperator();
inputOperator.setStore(store);
@@ -360,7 +372,8 @@ public class JdbcOperatorTest
Attribute.AttributeMap.DefaultAttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
attributeMap.put(DAG.APPLICATION_ID, APP_ID);
- OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
+ OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
+ OPERATOR_ID, attributeMap);
insertEventsInTable(10);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcStoreTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcStoreTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcStoreTest.java
index 4048296..97e3e57 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcStoreTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcStoreTest.java
@@ -55,7 +55,8 @@ public class JdbcStoreTest
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
Input input = new Input(bais);
- JdbcStore deserializedStrore = (JdbcStore) kryo.readClassAndObject(input);
- Assert.assertEquals("connection properties", store.getConnectionProperties(), deserializedStrore.getConnectionProperties());
+ JdbcStore deserializedStrore = (JdbcStore)kryo.readClassAndObject(input);
+ Assert.assertEquals("connection properties", store.getConnectionProperties(),
+ deserializedStrore.getConnectionProperties());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/test/java/com/datatorrent/lib/io/FTPStringInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/FTPStringInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/FTPStringInputOperatorTest.java
index 05c7689..867d309 100644
--- a/library/src/test/java/com/datatorrent/lib/io/FTPStringInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/FTPStringInputOperatorTest.java
@@ -70,9 +70,10 @@ public class FTPStringInputOperatorTest
ftpOperator.setPassword("test");
ftpOperator.setDirectory(homeDirectory.getPath());
- ftpOperator.setup(new OperatorContextTestHelper.TestIdOperatorContext(11, new Attribute.AttributeMap.DefaultAttributeMap()));
+ ftpOperator.setup(
+ new OperatorContextTestHelper.TestIdOperatorContext(11, new Attribute.AttributeMap.DefaultAttributeMap()));
- sink = new CollectorTestSink<Object>();
+ sink = new CollectorTestSink<>();
ftpOperator.output.setSink(sink);
}
@@ -101,4 +102,4 @@ public class FTPStringInputOperatorTest
Assert.assertTrue("2", testMeta.sink.collectedTuples.contains("2"));
Assert.assertTrue("20", testMeta.sink.collectedTuples.contains("20"));
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
index cbcc8b4..94c587b 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
@@ -310,7 +310,8 @@ public class AbstractFileOutputOperatorTest
}
@Test
- public void testSingleFileCompletedWriteTmp() {
+ public void testSingleFileCompletedWriteTmp()
+ {
testMeta.writeToTmp = true;
testSingleFileCompletedWrite();
}
@@ -420,6 +421,7 @@ public class AbstractFileOutputOperatorTest
singleFileName,
correctContents);
}
+
@Test
public void testSingleFileFailedWriteOverwriteInitiaTmp() throws IOException
{
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java
index 10ca242..b8c916d 100644
--- a/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java
@@ -19,12 +19,15 @@
package com.datatorrent.lib.io.jms;
import java.io.File;
-import java.io.IOException;
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -33,10 +36,12 @@ import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.commons.io.FileUtils;
+
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.annotation.Stateless;
-
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
@@ -59,8 +64,7 @@ public class JMSStringInputOperatorTest
testBase = new JMSTestBase();
try {
testBase.beforTest();
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
String methodName = description.getMethodName();
@@ -75,7 +79,7 @@ public class JMSStringInputOperatorTest
operator = new JMSStringInputOperator();
operator.getConnectionFactoryProperties().put(JMSTestBase.AMQ_BROKER_URL, "vm://localhost");
- sink = new CollectorTestSink<Object>();
+ sink = new CollectorTestSink<>();
operator.output.setSink(sink);
operator.setup(context);
operator.activate(context);
@@ -87,13 +91,9 @@ public class JMSStringInputOperatorTest
operator.deactivate();
operator.teardown();
try {
- FileUtils.deleteDirectory(new File("target/"+ description.getClassName()));
+ FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
testBase.afterTest();
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
@@ -125,7 +125,8 @@ public class JMSStringInputOperatorTest
testMeta.operator.setup(testMeta.context);
testMeta.operator.activate(testMeta.context);
- Assert.assertEquals("largest recovery window", 1, testMeta.operator.getIdempotentStorageManager().getLargestRecoveryWindow());
+ Assert.assertEquals("largest recovery window", 1,
+ testMeta.operator.getIdempotentStorageManager().getLargestRecoveryWindow());
testMeta.operator.beginWindow(1);
testMeta.operator.endWindow();
@@ -155,14 +156,14 @@ public class JMSStringInputOperatorTest
testMeta.operator.emitTuples();
try {
testMeta.operator.endWindow();
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
LOG.debug("ack failed");
}
testMeta.operator.setup(testMeta.context);
testMeta.operator.activate(testMeta.context);
- Assert.assertEquals("window 1 should not exist", Stateless.WINDOW_ID, testMeta.operator.getIdempotentStorageManager().getLargestRecoveryWindow());
+ Assert.assertEquals("window 1 should not exist", Stateless.WINDOW_ID,
+ testMeta.operator.getIdempotentStorageManager().getLargestRecoveryWindow());
}
private void produceMsg(int numMessages) throws Exception
@@ -197,5 +198,5 @@ public class JMSStringInputOperatorTest
}
- private static transient final Logger LOG = LoggerFactory.getLogger(JMSStringInputOperatorTest.class);
+ private static final transient Logger LOG = LoggerFactory.getLogger(JMSStringInputOperatorTest.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e3a22fa..92466ab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,7 +144,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
- <maxAllowedViolations>8938</maxAllowedViolations>
+ <maxAllowedViolations>8768</maxAllowedViolations>
</configuration>
</plugin>
</plugins>
[3/4] incubator-apex-malhar git commit: MLHR-1912 #resolve #comment
fixed style violations
Posted by br...@apache.org.
MLHR-1912 #resolve #comment fixed style violations
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b9aa203d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b9aa203d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b9aa203d
Branch: refs/heads/devel-3
Commit: b9aa203d7c411181665362dc0cee7420c7d61291
Parents: 2fe4bec
Author: Chandni Singh <cs...@apache.org>
Authored: Thu Nov 19 08:30:41 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Thu Nov 19 11:16:17 2015 -0800
----------------------------------------------------------------------
.../datatorrent/lib/algo/UniqueValueCount.java | 13 +-
.../lib/codec/KryoSerializableStreamCodec.java | 4 +-
.../AbstractDBLookupCacheBackedOperator.java | 16 ++-
.../datatorrent/lib/db/cache/CacheManager.java | 27 +++--
.../datatorrent/lib/db/cache/CacheStore.java | 5 +-
.../lib/db/jdbc/AbstractJdbcInputOperator.java | 15 +--
...stractJdbcTransactionableOutputOperator.java | 20 ++--
.../db/jdbc/JDBCLookupCacheBackedOperator.java | 15 +--
.../lib/db/jdbc/JdbcPOJOInputOperator.java | 55 ++++++---
.../lib/db/jdbc/JdbcPOJOOutputOperator.java | 98 +++++++++------
.../com/datatorrent/lib/db/jdbc/JdbcStore.java | 14 +--
.../lib/db/jdbc/JdbcTransactionalStore.java | 36 +++---
.../lib/io/AbstractFTPInputOperator.java | 14 +--
.../lib/io/fs/AbstractFileOutputOperator.java | 23 ++--
.../lib/io/jms/AbstractJMSInputOperator.java | 85 +++++++------
.../lib/io/jms/JMSStringInputOperator.java | 10 +-
.../datatorrent/lib/metric/AvgAggregator.java | 3 +-
.../lib/metric/max/DoubleMaxAggregator.java | 3 +-
.../lib/metric/max/FloatMaxAggregator.java | 3 +-
.../lib/metric/max/IntMaxAggregator.java | 3 +-
.../lib/metric/max/LongMaxAggregator.java | 1 -
.../lib/metric/min/DoubleMinAggregator.java | 3 +-
.../lib/metric/min/FloatMinAggregator.java | 3 +-
.../lib/metric/min/IntMinAggregator.java | 3 +-
.../lib/metric/min/LongMinAggregator.java | 3 +-
.../lib/codec/KryoStreamCodecTest.java | 120 ++++++++++---------
.../lib/db/cache/CacheManagerTest.java | 2 +-
.../jdbc/JDBCLookupCacheBackedOperatorTest.java | 33 ++---
.../lib/db/jdbc/JdbcOperatorTest.java | 43 ++++---
.../datatorrent/lib/db/jdbc/JdbcStoreTest.java | 7 +-
.../lib/io/FTPStringInputOperatorTest.java | 7 +-
.../io/fs/AbstractFileOutputOperatorTest.java | 4 +-
.../lib/io/jms/JMSStringInputOperatorTest.java | 39 +++---
pom.xml | 2 +-
34 files changed, 388 insertions(+), 344 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java b/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java
index 3d5ed3b..6f8750d 100644
--- a/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java
+++ b/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java
@@ -24,15 +24,13 @@ import java.util.Set;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.datatorrent.lib.util.KeyValPair;
-
-import com.datatorrent.common.util.BaseOperator;
-
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
/**
* This operator counts the number of unique values corresponding to a key within a window.
@@ -89,20 +87,21 @@ public class UniqueValueCount<K> extends BaseOperator
@SuppressWarnings({"rawtypes", "unchecked"})
public Unifier<KeyValPair<K, Integer>> getUnifier()
{
- return (Unifier) new UniqueCountUnifier<K>();
+ return (Unifier)new UniqueCountUnifier<K>();
}
};
/**
* The output port which emits key and set containing unique values
*/
- public final transient DefaultOutputPort<KeyValPair<K, Set<Object>>> outputValues = new DefaultOutputPort<KeyValPair<K, Set<Object>>>()
+ public final transient DefaultOutputPort<KeyValPair<K, Set<Object>>> outputValues =
+ new DefaultOutputPort<KeyValPair<K, Set<Object>>>()
{
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public Unifier<KeyValPair<K, Set<Object>>> getUnifier()
{
- return (Unifier) new UniqueCountSetUnifier<K>();
+ return (Unifier)new UniqueCountSetUnifier<K>();
}
};
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/codec/KryoSerializableStreamCodec.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/codec/KryoSerializableStreamCodec.java b/library/src/main/java/com/datatorrent/lib/codec/KryoSerializableStreamCodec.java
index ecfa422..99e0a6e 100644
--- a/library/src/main/java/com/datatorrent/lib/codec/KryoSerializableStreamCodec.java
+++ b/library/src/main/java/com/datatorrent/lib/codec/KryoSerializableStreamCodec.java
@@ -28,6 +28,7 @@ import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.base.Preconditions;
+
import com.datatorrent.api.StreamCodec;
import com.datatorrent.netlet.util.Slice;
@@ -56,7 +57,8 @@ public class KryoSerializableStreamCodec<T> implements StreamCodec<T>, Serializa
}
/**
- * Registers a class with kryo. If the class of the tuple and its fields are registered then kryo serialization is more efficient.
+ * Registers a class with kryo. If the class of the tuple and its fields are registered then kryo serialization is
+ * more efficient.
*
* @param clazz class to register with Kryo.
*/
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/cache/AbstractDBLookupCacheBackedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/cache/AbstractDBLookupCacheBackedOperator.java b/library/src/main/java/com/datatorrent/lib/db/cache/AbstractDBLookupCacheBackedOperator.java
index 1510adf..42e02d3 100644
--- a/library/src/main/java/com/datatorrent/lib/db/cache/AbstractDBLookupCacheBackedOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/cache/AbstractDBLookupCacheBackedOperator.java
@@ -29,7 +29,6 @@ import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
-
import com.datatorrent.lib.db.Connectable;
import com.datatorrent.lib.util.KeyValPair;
@@ -53,7 +52,8 @@ import com.datatorrent.lib.util.KeyValPair;
* @param <S> type of store
* @since 0.9.1
*/
-public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectable> implements Operator, CacheManager.Backup
+public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectable>
+ implements Operator, CacheManager.Backup
{
@NotNull
protected S store;
@@ -83,11 +83,11 @@ public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectab
Object value = cacheManager.get(key);
if (value != null) {
- output.emit(new KeyValPair<Object, Object>(key, value));
+ output.emit(new KeyValPair<>(key, value));
}
}
- public final transient DefaultOutputPort<KeyValPair<Object, Object>> output = new DefaultOutputPort<KeyValPair<Object, Object>>();
+ public final transient DefaultOutputPort<KeyValPair<Object, Object>> output = new DefaultOutputPort<>();
@Override
public void beginWindow(long l)
@@ -107,8 +107,7 @@ public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectab
cacheManager.setBackup(this);
try {
cacheManager.initialize();
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException(e);
}
}
@@ -118,8 +117,7 @@ public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectab
{
try {
cacheManager.close();
- }
- catch (IOException e) {
+ } catch (IOException e) {
LOG.error("closing manager", e);
}
}
@@ -171,6 +169,6 @@ public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectab
*/
protected abstract Object getKeyFromTuple(T tuple);
- private final static Logger LOG = LoggerFactory.getLogger(AbstractDBLookupCacheBackedOperator.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractDBLookupCacheBackedOperator.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java b/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java
index 6d9f89d..2ea31f1 100644
--- a/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java
+++ b/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java
@@ -20,7 +20,12 @@ package com.datatorrent.lib.db.cache;
import java.io.Closeable;
import java.io.IOException;
-import java.util.*;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -36,15 +41,15 @@ import com.datatorrent.lib.db.KeyValueStore;
/**
* Manages primary and secondary stores.<br/>
- * <p>
- * It firsts checks the primary store for a key. If the primary store doesn't have the key, it queries the backup store and retrieves the value.<br/>
+ * It firsts checks the primary store for a key. If the primary store doesn't have the key, it queries the backup
+ * store and retrieves the value.<br/>
* If the key was present in the backup store, its value is returned and also saved in the primary store.
- * </p>
- * <p>
- * Typically primary store is faster but has limited size like memory and backup store is slower but unlimited like databases.<br/>
- * Store Manager can also refresh the values of keys at a specified time every day. This time is in format HH:mm:ss Z.<br/>
+ * <p/>
+ * Typically primary store is faster but has limited size like memory and backup store is slower but unlimited like
+ * databases.<br/>
+ * Store Manager can also refresh the values of keys at a specified time every day.
+ * This time is in format HH:mm:ss Z.<br/>
* This is not thread-safe.
- * </p>
*
* @since 0.9.2
*/
@@ -180,7 +185,7 @@ public class CacheManager implements Closeable
/**
* A primary store should also provide setting the value for a key.
*/
- public static interface Primary extends KeyValueStore
+ public interface Primary extends KeyValueStore
{
/**
@@ -195,7 +200,7 @@ public class CacheManager implements Closeable
* Backup store is queried when {@link Primary} doesn't contain a key.<br/>
* It also provides data needed at startup.<br/>
*/
- public static interface Backup extends KeyValueStore
+ public interface Backup extends KeyValueStore
{
/**
* <br>Backup stores are also used to initialize primary stores. This fetches initialization data.</br>
@@ -206,6 +211,6 @@ public class CacheManager implements Closeable
}
@SuppressWarnings("unused")
- private final static Logger LOG = LoggerFactory.getLogger(CacheManager.class);
+ private static final Logger LOG = LoggerFactory.getLogger(CacheManager.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java b/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
index d10fe13..72063ee 100644
--- a/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
+++ b/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
@@ -114,8 +114,7 @@ public class CacheStore implements CacheManager.Primary
CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder();
if (entryExpiryStrategy == ExpiryType.EXPIRE_AFTER_ACCESS) {
cacheBuilder.expireAfterAccess(entryExpiryDurationInMillis, TimeUnit.MILLISECONDS);
- }
- else if (entryExpiryStrategy == ExpiryType.EXPIRE_AFTER_WRITE) {
+ } else if (entryExpiryStrategy == ExpiryType.EXPIRE_AFTER_WRITE) {
cacheBuilder.expireAfterWrite(entryExpiryDurationInMillis, TimeUnit.MILLISECONDS);
}
cache = cacheBuilder.build();
@@ -186,7 +185,7 @@ public class CacheStore implements CacheManager.Primary
/**
* Strategies for time-based expiration of entries.
*/
- public static enum ExpiryType
+ public enum ExpiryType
{
/**
* Only expire the entries after the specified duration has passed since the entry was last accessed by a read
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java
index 1cb7635..fe6b077 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java
@@ -35,7 +35,8 @@ import com.datatorrent.lib.db.AbstractStoreInputOperator;
* and emits the data as tuples.
* Subclasses should implement the methods required to read the data from the database.
* <p>
- * This is an abstract class. Sub-classes need to implement {@link #queryToRetrieveData()} and {@link #getTuple(ResultSet)}.
+ * This is an abstract class. Sub-classes need to implement
+ * {@link #queryToRetrieveData()} and {@link #getTuple(ResultSet)}.
* </p>
* @displayName Abstract JDBC Input
* @category Input
@@ -84,17 +85,14 @@ public abstract class AbstractJdbcInputOperator<T> extends AbstractStoreInputOpe
outputPort.emit(tuple);
}
while (result.next());
- }
- else {
+ } else {
// No rows available wait for some time before retrying so as to not continuously slam the database
Thread.sleep(waitForDataTimeout);
}
- }
- catch (SQLException ex) {
+ } catch (SQLException ex) {
store.disconnect();
throw new RuntimeException(String.format("Error while running query: %s", query), ex);
- }
- catch (InterruptedException ex) {
+ } catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
@@ -106,8 +104,7 @@ public abstract class AbstractJdbcInputOperator<T> extends AbstractStoreInputOpe
super.setup(context);
try {
queryStatement = store.getConnection().createStatement();
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException("creating query", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
index b57a9b1..77b76c1 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
@@ -22,8 +22,8 @@ import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
-import javax.annotation.Nonnull;
import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.datatorrent.api.Context;
-
import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator;
/**
@@ -40,7 +39,8 @@ import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator
* <p>
* This operator creates a transaction at the start of window, executes batches of sql updates,
* and closes the transaction at the end of the window. Each tuple corresponds to an SQL update statement.
- * The operator groups the updates in a batch and submits them with one call to the database. Batch processing improves performance considerably.<br/>
+ * The operator groups the updates in a batch and submits them with one call to the database. Batch processing
+ * improves performance considerably.<br/>
* The size of a batch is configured by batchSize property.
* </p>
* <p>
@@ -55,7 +55,8 @@ import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator
* @param <T> type of tuple
* @since 0.9.4
*/
-public abstract class AbstractJdbcTransactionableOutputOperator<T> extends AbstractPassThruTransactionableStoreOutputOperator<T, JdbcTransactionalStore>
+public abstract class AbstractJdbcTransactionableOutputOperator<T>
+ extends AbstractPassThruTransactionableStoreOutputOperator<T, JdbcTransactionalStore>
{
protected static int DEFAULT_BATCH_SIZE = 1000;
@@ -80,8 +81,7 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> extends Abstr
super.setup(context);
try {
updateCommand = store.connection.prepareStatement(getUpdateCommand());
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException(e);
}
@@ -117,11 +117,9 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> extends Abstr
}
updateCommand.executeBatch();
updateCommand.clearBatch();
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException("processing batch", e);
- }
- finally {
+ } finally {
batchStartIdx += tuples.size() - batchStartIdx;
}
}
@@ -142,7 +140,7 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> extends Abstr
*
* @return the sql statement to update a tuple in the database.
*/
- @Nonnull
+ @NotNull
protected abstract String getUpdateCommand();
/**
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperator.java
index 93006eb..7ca2ae1 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperator.java
@@ -78,8 +78,7 @@ public abstract class JDBCLookupCacheBackedOperator<T> extends AbstractDBLookupC
try {
putStatement = store.connection.prepareStatement(insertQuery);
getStatement = store.connection.prepareStatement(getQuery);
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException(e);
}
}
@@ -90,8 +89,7 @@ public abstract class JDBCLookupCacheBackedOperator<T> extends AbstractDBLookupC
try {
preparePutStatement(putStatement, key, value);
putStatement.executeUpdate();
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException("while executing insert", e);
}
}
@@ -103,8 +101,7 @@ public abstract class JDBCLookupCacheBackedOperator<T> extends AbstractDBLookupC
prepareGetStatement(getStatement, key);
ResultSet resultSet = getStatement.executeQuery();
return processResultSet(resultSet);
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException("while fetching key", e);
}
}
@@ -118,8 +115,7 @@ public abstract class JDBCLookupCacheBackedOperator<T> extends AbstractDBLookupC
prepareGetStatement(getStatement, key);
ResultSet resultSet = getStatement.executeQuery();
values.add(processResultSet(resultSet));
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException("while fetching keys", e);
}
}
@@ -128,7 +124,8 @@ public abstract class JDBCLookupCacheBackedOperator<T> extends AbstractDBLookupC
protected abstract void prepareGetStatement(PreparedStatement getStatement, Object key) throws SQLException;
- protected abstract void preparePutStatement(PreparedStatement putStatement, Object key, Object value) throws SQLException;
+ protected abstract void preparePutStatement(PreparedStatement putStatement, Object key, Object value)
+ throws SQLException;
protected abstract String fetchInsertQuery();
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
index eafff3c..3aa6fac 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
@@ -19,7 +19,14 @@
package com.datatorrent.lib.db.jdbc;
import java.math.BigDecimal;
-import java.sql.*;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
import java.util.List;
import java.util.Map;
@@ -37,7 +44,6 @@ import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-
import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.PojoUtils;
@@ -49,8 +55,8 @@ import com.datatorrent.lib.util.PojoUtils;
*
* For eg. user can set the query property to a complex one : "select x1, x2 from t1, t2 where t1.x3 = t2.x3 ;"<br/>
*
- * This implementation is generic so it uses offset/limit mechanism for batching which is not optimal. Batching is most efficient
- * when the tables/views are indexed and the query uses this information to retrieve data.<br/>
+ * This implementation is generic so it uses offset/limit mechanism for batching which is not optimal. Batching is
+ * most efficient when the tables/views are indexed and the query uses this information to retrieve data.<br/>
* This can be achieved in sub-classes by overriding {@link #queryToRetrieveData()} and {@link #setRuntimeParams()}.
*
* @displayName Jdbc Input Operator
@@ -58,7 +64,8 @@ import com.datatorrent.lib.util.PojoUtils;
* @tags database, sql, pojo, jdbc
* @since 2.1.0
*/
-public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> implements Operator.ActivationListener<Context.OperatorContext>
+public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object>
+ implements Operator.ActivationListener<Context.OperatorContext>
{
private static int DEF_FETCH_SIZE = 100;
@@ -363,53 +370,65 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> imp
switch (type) {
case (Types.CHAR):
case (Types.VARCHAR):
- activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(),
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression(),
String.class);
break;
case (Types.BOOLEAN):
- activeFieldInfo.setterOrGetter = PojoUtils.createSetterBoolean(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterBoolean(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.TINYINT):
- activeFieldInfo.setterOrGetter = PojoUtils.createSetterByte(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterByte(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.SMALLINT):
- activeFieldInfo.setterOrGetter = PojoUtils.createSetterShort(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterShort(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.INTEGER):
- activeFieldInfo.setterOrGetter = PojoUtils.createSetterInt(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterInt(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.BIGINT):
- activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.FLOAT):
- activeFieldInfo.setterOrGetter = PojoUtils.createSetterFloat(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterFloat(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.DOUBLE):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case Types.DECIMAL:
- activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(),
- BigDecimal.class);
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression(),
+ BigDecimal.class);
break;
case Types.TIMESTAMP:
- activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case Types.TIME:
- activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case Types.DATE:
- activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
default:
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
index 5399c44..ffb4160 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
@@ -18,12 +18,32 @@
*/
package com.datatorrent.lib.db.jdbc;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.collect.Lists;
+
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-
import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.PojoUtils;
import com.datatorrent.lib.util.PojoUtils.Getter;
@@ -34,18 +54,6 @@ import com.datatorrent.lib.util.PojoUtils.GetterInt;
import com.datatorrent.lib.util.PojoUtils.GetterLong;
import com.datatorrent.lib.util.PojoUtils.GetterShort;
-import java.math.BigDecimal;
-import java.sql.*;
-import java.util.List;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.hadoop.classification.InterfaceStability.Evolving;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
/**
* <p>
* JdbcPOJOOutputOperator class.</p>
@@ -57,7 +65,8 @@ import com.google.common.collect.Lists;
* @since 2.1.0
*/
@Evolving
-public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object> implements Operator.ActivationListener<OperatorContext>
+public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object>
+ implements Operator.ActivationListener<OperatorContext>
{
@NotNull
private List<FieldInfo> fieldInfos;
@@ -165,51 +174,51 @@ public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOpe
switch (type) {
case (Types.CHAR):
case (Types.VARCHAR):
- statement.setString(i + 1, ((Getter<Object, String>) activeFieldInfo.setterOrGetter).get(tuple));
+ statement.setString(i + 1, ((Getter<Object, String>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.BOOLEAN):
- statement.setBoolean(i + 1, ((GetterBoolean<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+ statement.setBoolean(i + 1, ((GetterBoolean<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.TINYINT):
- statement.setByte(i + 1, ((PojoUtils.GetterByte<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+ statement.setByte(i + 1, ((PojoUtils.GetterByte<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.SMALLINT):
- statement.setShort(i + 1, ((GetterShort<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+ statement.setShort(i + 1, ((GetterShort<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.INTEGER):
- statement.setInt(i + 1, ((GetterInt<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+ statement.setInt(i + 1, ((GetterInt<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.BIGINT):
- statement.setLong(i + 1, ((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+ statement.setLong(i + 1, ((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.FLOAT):
- statement.setFloat(i + 1, ((GetterFloat<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+ statement.setFloat(i + 1, ((GetterFloat<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.DOUBLE):
- statement.setDouble(i + 1, ((GetterDouble<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+ statement.setDouble(i + 1, ((GetterDouble<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case Types.DECIMAL:
- statement.setBigDecimal(i + 1, ((Getter<Object, BigDecimal>) activeFieldInfo.setterOrGetter).get(tuple));
+ statement.setBigDecimal(i + 1, ((Getter<Object, BigDecimal>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case Types.TIMESTAMP:
- statement.setTimestamp(i + 1, new Timestamp(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple)));
+ statement.setTimestamp(i + 1, new Timestamp(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
break;
case Types.TIME:
- statement.setTime(i + 1, new Time(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple)));
+ statement.setTime(i + 1, new Time(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
break;
case Types.DATE:
- statement.setDate(i + 1, new Date(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple)));
+ statement.setDate(i + 1, new Date(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
break;
default:
@@ -271,53 +280,64 @@ public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOpe
switch (type) {
case (Types.CHAR):
case (Types.VARCHAR):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(),
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression(),
String.class);
break;
case (Types.BOOLEAN):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterBoolean(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterBoolean(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.TINYINT):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterByte(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterByte(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.SMALLINT):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterShort(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterShort(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.INTEGER):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterInt(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterInt(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.BIGINT):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.FLOAT):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterFloat(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterFloat(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.DOUBLE):
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case Types.DECIMAL:
- activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(),
- BigDecimal.class);
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression(), BigDecimal.class);
break;
case Types.TIMESTAMP:
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case Types.TIME:
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case Types.DATE:
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
default:
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java
index 2e284ad..d9901aa 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java
@@ -32,8 +32,8 @@ import com.google.common.base.CharMatcher;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
-import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.lib.db.Connectable;
+import com.datatorrent.netlet.util.DTThrowable;
/**
* A {@link Connectable} that uses jdbc to connect to stores.
@@ -125,7 +125,8 @@ public class JdbcStore implements Connectable
*/
public void setConnectionProperties(String connectionProps)
{
- String[] properties = Iterables.toArray(Splitter.on(CharMatcher.anyOf(":,")).omitEmptyStrings().trimResults().split(connectionProps), String.class);
+ String[] properties = Iterables.toArray(Splitter.on(CharMatcher.anyOf(":,")).omitEmptyStrings().trimResults()
+ .split(connectionProps), String.class);
for (int i = 0; i < properties.length; i += 2) {
if (i + 1 < properties.length) {
connectionProperties.put(properties[i], properties[i + 1]);
@@ -163,8 +164,7 @@ public class JdbcStore implements Connectable
connection = DriverManager.getConnection(databaseUrl, connectionProperties);
logger.debug("JDBC connection Success");
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
DTThrowable.rethrow(t);
}
}
@@ -177,8 +177,7 @@ public class JdbcStore implements Connectable
{
try {
connection.close();
- }
- catch (SQLException ex) {
+ } catch (SQLException ex) {
throw new RuntimeException("closing database resource", ex);
}
}
@@ -188,8 +187,7 @@ public class JdbcStore implements Connectable
{
try {
return !connection.isClosed();
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException("is isConnected", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java
index e4a7229..9bc18b0 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java
@@ -24,10 +24,11 @@ import java.sql.SQLException;
import javax.validation.constraints.NotNull;
-import com.datatorrent.lib.db.TransactionableStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.lib.db.TransactionableStore;
+
/**
* <p>JdbcTransactionalStore class.</p>
*
@@ -117,7 +118,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
super.connect();
try {
String command = "select " + metaTableWindowColumn + " from " + metaTable + " where " + metaTableAppIdColumn +
- " = ? and " + metaTableOperatorIdColumn + " = ?";
+ " = ? and " + metaTableOperatorIdColumn + " = ?";
logger.debug(command);
lastWindowFetchCommand = connection.prepareStatement(command);
@@ -126,18 +127,18 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
logger.debug(command);
lastWindowInsertCommand = connection.prepareStatement(command);
- command = "update " + metaTable + " set " + metaTableWindowColumn + " = ? where " + metaTableAppIdColumn + " = ? " +
- " and " + metaTableOperatorIdColumn + " = ?";
+ command = "update " + metaTable + " set " + metaTableWindowColumn + " = ? where " + metaTableAppIdColumn + " = ? "
+ + " and " + metaTableOperatorIdColumn + " = ?";
logger.debug(command);
lastWindowUpdateCommand = connection.prepareStatement(command);
- command = "delete from " + metaTable + " where " + metaTableAppIdColumn + " = ? and " + metaTableOperatorIdColumn + " = ?";
+ command = "delete from " + metaTable + " where " + metaTableAppIdColumn + " = ? and " + metaTableOperatorIdColumn
+ + " = ?";
logger.debug(command);
lastWindowDeleteCommand = connection.prepareStatement(command);
connection.setAutoCommit(false);
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException(e);
}
}
@@ -148,8 +149,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
if (lastWindowUpdateCommand != null) {
try {
lastWindowUpdateCommand.close();
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException(e);
}
}
@@ -168,8 +168,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
try {
connection.commit();
inTransaction = false;
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException(e);
}
}
@@ -180,8 +179,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
try {
connection.rollback();
inTransaction = false;
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException(e);
}
}
@@ -231,16 +229,14 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
ResultSet resultSet = lastWindowFetchCommand.executeQuery();
if (resultSet.next()) {
lastWindow = resultSet.getLong(1);
- }
- else {
+ } else {
lastWindowInsertCommand.setString(1, appId);
lastWindowInsertCommand.setInt(2, operatorId);
lastWindowInsertCommand.setLong(3, -1);
lastWindowInsertCommand.executeUpdate();
}
return lastWindow;
- }
- catch (SQLException ex) {
+ } catch (SQLException ex) {
throw new RuntimeException(ex);
}
}
@@ -253,8 +249,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
lastWindowUpdateCommand.setString(2, appId);
lastWindowUpdateCommand.setInt(3, operatorId);
lastWindowUpdateCommand.executeUpdate();
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException(e);
}
}
@@ -266,8 +261,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
lastWindowDeleteCommand.setString(1, appId);
lastWindowDeleteCommand.setInt(2, operatorId);
lastWindowDeleteCommand.executeUpdate();
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java
index 1cf57c5..d34f99b 100644
--- a/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java
@@ -27,15 +27,15 @@ import java.util.Map;
import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.commons.net.ftp.FTP;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.ftp.FTPFileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.datatorrent.api.DefaultOutputPort;
-
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
/**
@@ -90,10 +90,10 @@ public abstract class AbstractFTPInputOperator<T> extends AbstractFileInputOpera
{
super.partitioned(partitions);
for (Partition<AbstractFileInputOperator<T>> partition : partitions.values()) {
- ((AbstractFTPInputOperator<T>) partition.getPartitionedInstance()).host = host;
- ((AbstractFTPInputOperator<T>) partition.getPartitionedInstance()).port = port;
- ((AbstractFTPInputOperator<T>) partition.getPartitionedInstance()).userName = userName;
- ((AbstractFTPInputOperator<T>) partition.getPartitionedInstance()).password = password;
+ ((AbstractFTPInputOperator<T>)partition.getPartitionedInstance()).host = host;
+ ((AbstractFTPInputOperator<T>)partition.getPartitionedInstance()).port = port;
+ ((AbstractFTPInputOperator<T>)partition.getPartitionedInstance()).userName = userName;
+ ((AbstractFTPInputOperator<T>)partition.getPartitionedInstance()).password = password;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index 2aa658f..15a208f 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -217,7 +217,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
/**
* File output counters.
*/
- protected final BasicCounters<MutableLong> fileCounters = new BasicCounters<MutableLong>(MutableLong.class);
+ protected final BasicCounters<MutableLong> fileCounters = new BasicCounters<>(MutableLong.class);
protected StreamCodec<INPUT> streamCodec;
@@ -252,8 +252,9 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
* accessed by a read or write.
* <p/>
* https://code.google.com/p/guava-libraries/wiki/CachesExplained <br/>
- * Caches built with CacheBuilder do not perform cleanup and evict values "automatically," or instantly after a value expires, or anything of the sort.
- * Instead, it performs small amounts of maintenance during write operations, or during occasional read operations if writes are rare.<br/>
+ * Caches built with CacheBuilder do not perform cleanup and evict values "automatically," or instantly after a
+ * value expires, or anything of the sort. Instead, it performs small amounts of maintenance during write
+ * operations, or during occasional read operations if writes are rare.<br/>
* This isn't the most effective way but adds a little bit of optimization.
*/
private Long expireStreamAfterAcessMillis;
@@ -275,8 +276,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
{
if (AbstractFileOutputOperator.this.streamCodec == null) {
return super.getStreamCodec();
- }
- else {
+ } else {
return streamCodec;
}
}
@@ -414,8 +414,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
LOG.debug("rotating file at setup.");
rotate(seenFileName);
}
- }
- catch (IOException | ExecutionException e) {
+ } catch (IOException | ExecutionException e) {
throw new RuntimeException(e);
}
}
@@ -441,8 +440,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
* @param filename name of the actual file.
* @param partFileName name of the part file. When not rolling this is same as filename; otherwise this is the
* latest open part file name.
- * @param filepath path of the file. When always writing to temp file, this is the path of the temp file; otherwise
- * path of the actual file.
+ * @param filepath path of the file. When always writing to temp file, this is the path of the temp file;
+ * otherwise path of the actual file.
* @throws IOException
*/
private void recoverFile(String filename, String partFileName, Path filepath) throws IOException
@@ -714,7 +713,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
//Close all the streams you can
Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
- for(String seenFileName: openStreams.keySet()) {
+ for (String seenFileName : openStreams.keySet()) {
FSFilterStreamContext fsFilterStreamContext = openStreams.get(seenFileName);
try {
long start = System.currentTimeMillis();
@@ -1238,8 +1237,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
LOG.debug("rename from tmp {} actual {} ", tmpFileName, fileName);
rename(srcPath, destPath);
} else if (fs.exists(srcPath)) {
- //if the destination and src both exists that means there was a failure between file rename and clearing the endOffset so
- //we just delete the tmp file.
+ /*if the destination and src both exists that means there was a failure between file rename and clearing the
+ endOffset so we just delete the tmp file*/
LOG.debug("deleting tmp {}", tmpFileName);
fs.delete(srcPath, true);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
index fe9e35f..43445a7 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
@@ -25,14 +25,22 @@ import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
-import javax.jms.*;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
-import org.apache.commons.lang.mutable.MutableLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.lang.mutable.MutableLong;
+
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -43,10 +51,9 @@ import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.ActivationListener;
import com.datatorrent.api.annotation.OperatorAnnotation;
-
-import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.netlet.util.DTThrowable;
/**
* This is the base implementation of a JMS input operator.<br/>
@@ -71,8 +78,9 @@ import com.datatorrent.lib.io.IdempotentStorageManager;
* @since 0.3.2
*/
@OperatorAnnotation(checkpointableWithinAppWindow = false)
-public abstract class AbstractJMSInputOperator<T> extends JMSBase implements InputOperator, ActivationListener<OperatorContext>,
- MessageListener, ExceptionListener, Operator.IdleTimeHandler, Operator.CheckpointListener
+public abstract class AbstractJMSInputOperator<T> extends JMSBase
+ implements InputOperator, ActivationListener<OperatorContext>, MessageListener, ExceptionListener,
+ Operator.IdleTimeHandler, Operator.CheckpointListener
{
protected static final int DEFAULT_BUFFER_SIZE = 10 * 1024; // 10k
@@ -102,8 +110,8 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
protected transient long currentWindowId;
protected transient int emitCount;
- private transient final Set<String> pendingAck;
- private transient final Lock lock;
+ private final transient Set<String> pendingAck;
+ private final transient Lock lock;
public final transient DefaultOutputPort<T> output = new DefaultOutputPort<T>();
@@ -129,8 +137,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
synchronized (lock) {
try {
return messageConsumed(message) && super.add(message);
- }
- catch (JMSException e) {
+ } catch (JMSException e) {
LOG.error("message consumption", e);
throwable.set(e);
throw new RuntimeException(e);
@@ -162,10 +169,10 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
{
try {
if (message.getJMSReplyTo() != null) { // Send reply only if the replyTo destination is set
- replyProducer.send(message.getJMSReplyTo(), getSession().createTextMessage("Reply: " + message.getJMSMessageID()));
+ replyProducer.send(message.getJMSReplyTo(),
+ getSession().createTextMessage("Reply: " + message.getJMSMessageID()));
}
- }
- catch (JMSException ex) {
+ } catch (JMSException ex) {
LOG.error(ex.getLocalizedMessage());
throwable.set(ex);
throw new RuntimeException(ex);
@@ -199,8 +206,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
if (operatorRecoveredWindows != null) {
Arrays.sort(operatorRecoveredWindows);
}
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException("fetching windows", e);
}
}
@@ -223,7 +229,8 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
pendingAck.add(message.getJMSMessageID());
MutableLong receivedCt = counters.getCounter(CounterKeys.RECEIVED);
receivedCt.increment();
- LOG.debug("message id: {} buffer size: {} received: {}", message.getJMSMessageID(), holdingBuffer.size(), receivedCt.longValue());
+ LOG.debug("message id: {} buffer size: {} received: {}", message.getJMSMessageID(), holdingBuffer.size(),
+ receivedCt.longValue());
return true;
}
@@ -239,11 +246,10 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
replyProducer = getSession().createProducer(null);
consumer = (isDurable() && isTopic()) ?
- getSession().createDurableSubscriber((Topic) getDestination(), consumerName) :
- getSession().createConsumer(getDestination());
+ getSession().createDurableSubscriber((Topic)getDestination(), consumerName) :
+ getSession().createConsumer(getDestination());
consumer.setMessageListener(this);
- }
- catch (JMSException ex) {
+ } catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
@@ -264,7 +270,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
{
try {
@SuppressWarnings("unchecked")
- Map<String, T> recoveredData = (Map<String, T>) idempotentStorageManager.load(context.getId(), windowId);
+ Map<String, T> recoveredData = (Map<String, T>)idempotentStorageManager.load(context.getId(), windowId);
if (recoveredData == null) {
return;
}
@@ -272,8 +278,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
pendingAck.add(recoveredEntry.getKey());
emit(recoveredEntry.getValue());
}
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException("replay", e);
}
}
@@ -306,8 +311,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
currentWindowRecoveryState.put(message.getJMSMessageID(), payload);
emit(payload);
}
- }
- catch (JMSException e) {
+ } catch (JMSException e) {
throw new RuntimeException("processing msg", e);
}
}
@@ -320,12 +324,10 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
/* nothing to do here, so sleep for a while to avoid busy loop */
try {
Thread.sleep(spinMillis);
- }
- catch (InterruptedException ie) {
+ } catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
- }
- else {
+ } else {
DTThrowable.rethrow(lthrowable);
}
}
@@ -338,7 +340,8 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
* acknowledged have been persisted because they wouldn't be redelivered. Also if they are persisted then
* they shouldn't be re-delivered because that would cause duplicates.<br/>
*
- * This is why when recovery data is persisted and messages are acknowledged, the thread that consumes message is blocked.<br/>
+ * This is why when recovery data is persisted and messages are acknowledged, the thread that consumes message is
+ * blocked.<br/>
*/
@Override
public void endWindow()
@@ -365,27 +368,24 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
}
ackCompleted = true;
pendingAck.clear();
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
if (!ackCompleted) {
LOG.info("confirm recovery of {} for {} does not exist", context.getId(), currentWindowId, t);
}
DTThrowable.rethrow(t);
- }
- finally {
+ } finally {
if (stateSaved && !ackCompleted) {
try {
idempotentStorageManager.delete(context.getId(), currentWindowId);
- }
- catch (IOException e) {
+ } catch (IOException e) {
LOG.error("unable to delete corrupted state", e);
}
}
}
}
emitCount = 0; //reset emit count
- }
- else if (operatorRecoveredWindows != null && currentWindowId < operatorRecoveredWindows[operatorRecoveredWindows.length - 1]) {
+ } else if (operatorRecoveredWindows != null &&
+ currentWindowId < operatorRecoveredWindows[operatorRecoveredWindows.length - 1]) {
//pendingAck is not cleared for the last replayed window of this operator. This is because there is
//still a chance that in the previous run the operator crashed after saving the state but before acknowledgement.
pendingAck.clear();
@@ -402,8 +402,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
{
if (isTransacted()) {
getSession().commit();
- }
- else if (getSessionAckMode(getAckMode()) == Session.CLIENT_ACKNOWLEDGE) {
+ } else if (getSessionAckMode(getAckMode()) == Session.CLIENT_ACKNOWLEDGE) {
lastMsg.acknowledge(); // acknowledge all consumed messages till now
}
}
@@ -418,8 +417,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
{
try {
idempotentStorageManager.deleteUpTo(context.getId(), windowId);
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException("committing", e);
}
}
@@ -441,8 +439,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
consumer = null;
super.cleanup();
- }
- catch (JMSException ex) {
+ } catch (JMSException ex) {
throw new RuntimeException("at cleanup", ex);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/io/jms/JMSStringInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/JMSStringInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/JMSStringInputOperator.java
index bbc8d0f..a72d127 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/JMSStringInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/JMSStringInputOperator.java
@@ -37,12 +37,10 @@ public class JMSStringInputOperator extends AbstractJMSInputOperator<String>
public String convert(Message message) throws JMSException
{
if (message instanceof TextMessage) {
- return ((TextMessage) message).getText();
- }
- else if (message instanceof StreamMessage) {
- return ((StreamMessage) message).readString();
- }
- else {
+ return ((TextMessage)message).getText();
+ } else if (message instanceof StreamMessage) {
+ return ((StreamMessage)message).readString();
+ } else {
throw new IllegalArgumentException("Unhandled message type " + message.getClass().getName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/AvgAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/AvgAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/AvgAggregator.java
index 33f7f8d..ab9a261 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/AvgAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/AvgAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.Collection;
import com.datatorrent.api.annotation.Name;
-
import com.datatorrent.common.metric.SingleMetricAggregator;
/**
@@ -38,7 +37,7 @@ public class AvgAggregator implements SingleMetricAggregator, Serializable
double sum = 0;
for (Object value : metricValues) {
- sum += ((Number) value).doubleValue();
+ sum += ((Number)value).doubleValue();
}
return sum / metricValues.size();
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/max/DoubleMaxAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/max/DoubleMaxAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/max/DoubleMaxAggregator.java
index 40d8c04..13988f9 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/max/DoubleMaxAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/max/DoubleMaxAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.Collection;
import com.datatorrent.api.annotation.Name;
-
import com.datatorrent.common.metric.SingleMetricAggregator;
/**
@@ -37,7 +36,7 @@ public class DoubleMaxAggregator implements SingleMetricAggregator, Serializable
{
Double max = null;
for (Object value : metricValues) {
- double dval = ((Number) value).doubleValue();
+ double dval = ((Number)value).doubleValue();
if (max == null || dval > max) {
max = dval;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/max/FloatMaxAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/max/FloatMaxAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/max/FloatMaxAggregator.java
index 68f8f2a..6b228e8 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/max/FloatMaxAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/max/FloatMaxAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.Collection;
import com.datatorrent.api.annotation.Name;
-
import com.datatorrent.common.metric.SingleMetricAggregator;
/**
@@ -37,7 +36,7 @@ public class FloatMaxAggregator implements SingleMetricAggregator, Serializable
{
Float max = null;
for (Object value : metricValues) {
- float fval = ((Number) value).floatValue();
+ float fval = ((Number)value).floatValue();
if (max == null || fval > max) {
max = fval;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/max/IntMaxAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/max/IntMaxAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/max/IntMaxAggregator.java
index 71eaba0..2cd0264 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/max/IntMaxAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/max/IntMaxAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.Collection;
import com.datatorrent.api.annotation.Name;
-
import com.datatorrent.common.metric.SingleMetricAggregator;
/**
@@ -37,7 +36,7 @@ public class IntMaxAggregator implements SingleMetricAggregator, Serializable
{
Integer max = null;
for (Object value : metricValues) {
- int ival = ((Number) value).intValue();
+ int ival = ((Number)value).intValue();
if (max == null || ival > max) {
max = ival;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/max/LongMaxAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/max/LongMaxAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/max/LongMaxAggregator.java
index 43a1abd..f0aab73 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/max/LongMaxAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/max/LongMaxAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.Collection;
import com.datatorrent.api.annotation.Name;
-
import com.datatorrent.common.metric.SingleMetricAggregator;
/**
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/min/DoubleMinAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/min/DoubleMinAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/min/DoubleMinAggregator.java
index d65e744..1b1b712 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/min/DoubleMinAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/min/DoubleMinAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.Collection;
import com.datatorrent.api.annotation.Name;
-
import com.datatorrent.common.metric.SingleMetricAggregator;
/**
@@ -37,7 +36,7 @@ public class DoubleMinAggregator implements SingleMetricAggregator, Serializable
{
Double min = null;
for (Object value : metricValues) {
- double dval = ((Number) value).doubleValue();
+ double dval = ((Number)value).doubleValue();
if (min == null || dval < min) {
min = dval;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/min/FloatMinAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/min/FloatMinAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/min/FloatMinAggregator.java
index bbe0861..6713911 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/min/FloatMinAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/min/FloatMinAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.Collection;
import com.datatorrent.api.annotation.Name;
-
import com.datatorrent.common.metric.SingleMetricAggregator;
/**
@@ -37,7 +36,7 @@ public class FloatMinAggregator implements SingleMetricAggregator, Serializable
{
Float min = null;
for (Object metric : metricValues) {
- float fval = ((Number) metric).floatValue();
+ float fval = ((Number)metric).floatValue();
if (min == null || fval < min) {
min = fval;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/min/IntMinAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/min/IntMinAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/min/IntMinAggregator.java
index 76ffebc..72c610f 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/min/IntMinAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/min/IntMinAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.Collection;
import com.datatorrent.api.annotation.Name;
-
import com.datatorrent.common.metric.SingleMetricAggregator;
/**
@@ -37,7 +36,7 @@ public class IntMinAggregator implements SingleMetricAggregator, Serializable
{
Integer min = null;
for (Object value : metricValues) {
- int ival = ((Number) value).intValue();
+ int ival = ((Number)value).intValue();
if (min == null || ival < min) {
min = ival;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/min/LongMinAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/min/LongMinAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/min/LongMinAggregator.java
index 19a1dab..0deb808 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/min/LongMinAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/min/LongMinAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.Collection;
import com.datatorrent.api.annotation.Name;
-
import com.datatorrent.common.metric.SingleMetricAggregator;
/**
@@ -37,7 +36,7 @@ public class LongMinAggregator implements SingleMetricAggregator, Serializable
{
Long min = null;
for (Object value : metricValues) {
- long lval = ((Number) value).longValue();
+ long lval = ((Number)value).longValue();
if (min == null || lval < min) {
min = lval;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/test/java/com/datatorrent/lib/codec/KryoStreamCodecTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/codec/KryoStreamCodecTest.java b/library/src/test/java/com/datatorrent/lib/codec/KryoStreamCodecTest.java
index 0d5c646..4affef7 100644
--- a/library/src/test/java/com/datatorrent/lib/codec/KryoStreamCodecTest.java
+++ b/library/src/test/java/com/datatorrent/lib/codec/KryoStreamCodecTest.java
@@ -31,79 +31,89 @@ import com.datatorrent.netlet.util.Slice;
* Tests for {@link KryoSerializableStreamCodec}
*
*/
-public class KryoStreamCodecTest {
+public class KryoStreamCodecTest
+{
- public static class TestTuple {
- final Integer field;
+ public static class TestTuple
+ {
+ final Integer field;
- @SuppressWarnings("unused")
- private TestTuple(){
- this.field= 0;
- }
-
- public TestTuple(Integer x){
- this.field= Preconditions.checkNotNull(x,"x");
- }
+ @SuppressWarnings("unused")
+ private TestTuple()
+ {
+ this.field = 0;
+ }
- @Override
- public boolean equals(Object o) {
- return o.getClass()== this.getClass() && ((TestTuple)o).field.equals(this.field);
- }
+ public TestTuple(Integer x)
+ {
+ this.field = Preconditions.checkNotNull(x, "x");
+ }
- @Override
- public int hashCode() {
- return TestTuple.class.hashCode()^ this.field.hashCode();
- }
+ @Override
+ public boolean equals(Object o)
+ {
+ return o.getClass() == this.getClass() && ((TestTuple)o).field.equals(this.field);
}
- public static class TestKryoStreamCodec extends KryoSerializableStreamCodec<TestTuple> {
+ @Override
+ public int hashCode()
+ {
+ return TestTuple.class.hashCode() ^ this.field.hashCode();
+ }
+ }
- public TestKryoStreamCodec(){
- super();
- }
+ public static class TestKryoStreamCodec extends KryoSerializableStreamCodec<TestTuple>
+ {
- @Override
- public int getPartition(TestTuple testTuple) {
- return testTuple.field;
- }
+ public TestKryoStreamCodec()
+ {
+ super();
}
- @Test
- public void testSomeMethod() throws IOException
+ @Override
+ public int getPartition(TestTuple testTuple)
{
- TestKryoStreamCodec coder = new TestKryoStreamCodec();
- TestKryoStreamCodec decoder = new TestKryoStreamCodec();
+ return testTuple.field;
+ }
+ }
- KryoSerializableStreamCodec<Object> objCoder = new KryoSerializableStreamCodec<Object>();
- Slice sliceOfObj = objCoder.toByteArray(10);
- Integer decodedObj = (Integer) objCoder.fromByteArray(sliceOfObj);
+ @Test
+ public void testSomeMethod() throws IOException
+ {
+ TestKryoStreamCodec coder = new TestKryoStreamCodec();
+ TestKryoStreamCodec decoder = new TestKryoStreamCodec();
- Assert.assertEquals("codec", decodedObj.intValue(), 10);
+ KryoSerializableStreamCodec<Object> objCoder = new KryoSerializableStreamCodec<Object>();
+ Slice sliceOfObj = objCoder.toByteArray(10);
+ Integer decodedObj = (Integer)objCoder.fromByteArray(sliceOfObj);
- TestTuple tp= new TestTuple(5);
+ Assert.assertEquals("codec", decodedObj.intValue(), 10);
- Slice dsp1 = coder.toByteArray(tp);
- Slice dsp2 = coder.toByteArray(tp);
- Assert.assertEquals(dsp1, dsp2);
+ TestTuple tp = new TestTuple(5);
- Object tcObject1 = decoder.fromByteArray(dsp1);
- assert (tp.equals(tcObject1));
+ Slice dsp1 = coder.toByteArray(tp);
+ Slice dsp2 = coder.toByteArray(tp);
+ Assert.assertEquals(dsp1, dsp2);
- Object tcObject2 = decoder.fromByteArray(dsp2);
- assert (tp.equals(tcObject2));
+ Object tcObject1 = decoder.fromByteArray(dsp1);
+ assert (tp.equals(tcObject1));
- dsp1 = coder.toByteArray(tp);
- dsp2 = coder.toByteArray(tp);
- Assert.assertEquals(dsp1, dsp2);
- }
+ Object tcObject2 = decoder.fromByteArray(dsp2);
+ assert (tp.equals(tcObject2));
- @Test
- public void testFinalFieldSerialization() throws Exception{
- TestTuple t1 = new TestTuple(5);
- TestKryoStreamCodec codec= new TestKryoStreamCodec();
+ dsp1 = coder.toByteArray(tp);
+ dsp2 = coder.toByteArray(tp);
+ Assert.assertEquals(dsp1, dsp2);
+ }
- Slice dsp = codec.toByteArray(t1);
- TestTuple t2 = (TestTuple)codec.fromByteArray(dsp);
- Assert.assertEquals("", t1.field, t2.field);
- }
+ @Test
+ public void testFinalFieldSerialization() throws Exception
+ {
+ TestTuple t1 = new TestTuple(5);
+ TestKryoStreamCodec codec = new TestKryoStreamCodec();
+
+ Slice dsp = codec.toByteArray(t1);
+ TestTuple t2 = (TestTuple)codec.fromByteArray(dsp);
+ Assert.assertEquals("", t1.field, t2.field);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java b/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java
index 9ee536e..caa02bf 100644
--- a/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java
@@ -62,7 +62,7 @@ public class CacheManagerTest
@Override
public boolean apply(@Nullable Object key)
{
- return key != null && (Integer) key <= 5;
+ return key != null && (Integer)key <= 5;
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/test/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperatorTest.java
index 9078271..46f49e2 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperatorTest.java
@@ -18,7 +18,12 @@
*/
package com.datatorrent.lib.db.jdbc;
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.List;
@@ -36,7 +41,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
import com.datatorrent.api.Context;
-
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
@@ -49,8 +53,9 @@ public class JDBCLookupCacheBackedOperatorTest
private static final String INMEM_DB_DRIVER = org.hsqldb.jdbcDriver.class.getName();
protected static final String TABLE_NAME = "Test_Lookup_Cache";
- protected static TestJDBCLookupCacheBackedOperator lookupCacheBackedOperator = new TestJDBCLookupCacheBackedOperator();
- protected static CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
+ protected static TestJDBCLookupCacheBackedOperator lookupCacheBackedOperator =
+ new TestJDBCLookupCacheBackedOperator();
+ protected static CollectorTestSink<Object> sink = new CollectorTestSink<>();
protected static final Map<Integer, String> mapping = Maps.newHashMap();
static {
@@ -61,9 +66,10 @@ public class JDBCLookupCacheBackedOperatorTest
mapping.put(5, "five");
}
- protected static transient final Logger logger = LoggerFactory.getLogger(JDBCLookupCacheBackedOperatorTest.class);
+ protected static final transient Logger logger = LoggerFactory.getLogger(
+ JDBCLookupCacheBackedOperatorTest.class);
- private final static Exchanger<List<Object>> bulkValuesExchanger = new Exchanger<List<Object>>();
+ private static final Exchanger<List<Object>> bulkValuesExchanger = new Exchanger<>();
public static class TestJDBCLookupCacheBackedOperator extends JDBCLookupCacheBackedOperator<String>
{
@@ -83,15 +89,15 @@ public class JDBCLookupCacheBackedOperatorTest
@Override
protected void preparePutStatement(PreparedStatement putStatement, Object key, Object value) throws SQLException
{
- putStatement.setInt(1, (Integer) key);
- putStatement.setString(2, (String) value);
+ putStatement.setInt(1, (Integer)key);
+ putStatement.setString(2, (String)value);
}
@Override
protected void prepareGetStatement(PreparedStatement getStatement, Object key) throws SQLException
{
- getStatement.setInt(1, (Integer) key);
+ getStatement.setInt(1, (Integer)key);
}
@Override
@@ -121,8 +127,7 @@ public class JDBCLookupCacheBackedOperatorTest
List<Object> values = super.getAll(keys);
try {
bulkValuesExchanger.exchange(values);
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
throw new RuntimeException(e);
}
return values;
@@ -164,7 +169,7 @@ public class JDBCLookupCacheBackedOperatorTest
Statement stmt = con.createStatement();
String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
- + " (col1 INTEGER, col2 VARCHAR(20))";
+ + " (col1 INTEGER, col2 VARCHAR(20))";
stmt.executeUpdate(createTable);
stmt.executeUpdate("Delete from " + TABLE_NAME);
@@ -172,8 +177,8 @@ public class JDBCLookupCacheBackedOperatorTest
// populate the database
for (Map.Entry<Integer, String> entry : mapping.entrySet()) {
String insert = "INSERT INTO " + TABLE_NAME
- + " (col1, col2) VALUES (" + entry.getKey() + ", '"
- + entry.getValue() + "')";
+ + " (col1, col2) VALUES (" + entry.getKey() + ", '"
+ + entry.getValue() + "')";
stmt.executeUpdate(insert);
}